Incrementally build a parking lot platform: begin with a clean single-floor design, extend to multi-entry concurrent operations, and finally wrap it with resiliency patterns that protect external integrations.
Parking Campus ├─ Level 1: Floor F0 → Slots S1..Sn ├─ Level 2: Entry Dispatchers → Floor Locks → Assignments └─ Level 3: Entry Service → CircuitBreaker(Payment) → Ticketing Fallback
Level 1 — Core Design
Focus: single-floor service using repository, strategy, and guard chain patterns while emitting domain events.
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, Dict, Iterable, Optional, Protocol
class SpotRepository(Protocol):
def available(self, *, level: Optional[int] = None) -> Iterable["Spot"]:
...
def get(self, spot_id: str) -> Optional["Spot"]:
...
def update(self, spot_id: str, spot: "Spot") -> None:
...
@dataclass
class Spot:
spot_id: str
level: int
size: str
state: str = "VACANT"
vehicle_id: Optional[str] = None
@dataclass
class AllocationRequest:
vehicle_id: str
size: str
duration_hours: int
preferred_level: Optional[int] = None
@dataclass
class AllocationReceipt:
spot_id: str
fee: float
class EventBus:
def __init__(self) -> None:
self._subscribers: Dict[str, list[Callable[[dict], None]]] = {}
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers.setdefault(event, []).append(handler)
def publish(self, event: str, payload: dict) -> None:
for handler in self._subscribers.get(event, []):
handler(payload)
class BaseGuard:
def __init__(self) -> None:
self._next: Optional["BaseGuard"] = None
def set_next(self, nxt: "BaseGuard") -> "BaseGuard":
self._next = nxt
return nxt
def check(self, context: dict) -> Optional[str]:
return None
def handle(self, context: dict) -> Optional[str]:
failure = self.check(context)
if failure or not self._next:
return failure
return self._next.handle(context)
class VehicleSizeGuard(BaseGuard):
ORDER = {"compact": 0, "regular": 1, "large": 2}
def check(self, context: dict) -> Optional[str]:
request: AllocationRequest = context["request"]
spot: Spot = context["spot"]
try:
if self.ORDER[spot.size] < self.ORDER[request.size]:
return "SIZE_MISMATCH"
except KeyError:
return "UNKNOWN_SIZE"
return None
class SpotAvailabilityGuard(BaseGuard):
def check(self, context: dict) -> Optional[str]:
spot: Spot = context["spot"]
if spot.state != "VACANT":
return "TAKEN"
return None
class SpotSelector(Protocol):
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
...
class NearestLevelSelector:
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
preferred_level = request.preferred_level
best: Optional[Spot] = None
best_distance = float("inf")
for spot in candidates:
target = preferred_level if preferred_level is not None else spot.level
distance = abs(target - spot.level)
if distance < best_distance:
best = spot
best_distance = distance
return best
class PricingStrategy(Protocol):
def compute(self, request: AllocationRequest) -> float:
...
class FlatRatePricing(PricingStrategy):
RATES = {"compact": 5, "regular": 8, "large": 12}
def compute(self, request: AllocationRequest) -> float:
return self.RATES[request.size] * max(1, request.duration_hours)
class SpotLifecycle:
def __init__(self, spot: Spot) -> None:
self.spot = spot
def reserve(self) -> bool:
if self.spot.state != "VACANT":
return False
self.spot.state = "HELD"
return True
def occupy(self, vehicle_id: str) -> bool:
if self.spot.state != "HELD":
return False
self.spot.state = "OCCUPIED"
self.spot.vehicle_id = vehicle_id
return True
def vacate(self) -> bool:
if self.spot.state != "OCCUPIED":
return False
self.spot.state = "VACANT"
self.spot.vehicle_id = None
return True
class InMemorySpotRepository(SpotRepository):
def __init__(self, spots: Iterable[Spot]):
self._spots: Dict[str, Spot] = {spot.spot_id: spot for spot in spots}
def available(self, *, level: Optional[int] = None) -> Iterable[Spot]:
return [
spot
for spot in self._spots.values()
if spot.state == "VACANT" and (level is None or spot.level == level)
]
def get(self, spot_id: str) -> Optional[Spot]:
return self._spots.get(spot_id)
def update(self, spot_id: str, spot: Spot) -> None:
self._spots[spot_id] = spot
def all(self) -> Iterable[Spot]:
return self._spots.values()
class ParkingLotService:
def __init__(self, repo: SpotRepository, selector: SpotSelector, pricing: PricingStrategy, bus: EventBus) -> None:
self.repo = repo
self.selector = selector
self.pricing = pricing
self.bus = bus
self.guard = VehicleSizeGuard()
self.guard.set_next(SpotAvailabilityGuard())
def park(self, request: AllocationRequest) -> Optional[AllocationReceipt]:
candidates = self.repo.available(level=request.preferred_level)
spot = self.selector.choose(request, candidates)
if not spot:
return None
failure = self.guard.handle({"request": request, "spot": spot})
if failure:
return None
lifecycle = SpotLifecycle(spot)
if not lifecycle.reserve() or not lifecycle.occupy(request.vehicle_id):
return None
self.repo.update(spot.spot_id, spot)
fee = self.pricing.compute(request)
receipt = AllocationReceipt(spot_id=spot.spot_id, fee=fee)
self.bus.publish("parking.allocated", {"vehicle": request.vehicle_id, "spot": spot.spot_id, "fee": fee})
return receipt
def release(self, spot_id: str) -> bool:
spot = self.repo.get(spot_id)
if not spot:
return False
lifecycle = SpotLifecycle(spot)
if not lifecycle.vacate():
return False
self.repo.update(spot.spot_id, spot)
self.bus.publish("parking.released", {"spot": spot.spot_id})
return True
class StdoutBus(EventBus):
def publish(self, event: str, payload: dict) -> None:
super().publish(event, payload)
print(f"[event] {event}: {payload}")
def main() -> None:
repo = InMemorySpotRepository(
[
Spot("C1", level=1, size="compact"),
Spot("R1", level=1, size="regular"),
Spot("L2", level=2, size="large"),
]
)
service = ParkingLotService(repo, NearestLevelSelector(), FlatRatePricing(), StdoutBus())
request = AllocationRequest(vehicle_id="CAR-42", size="regular", duration_hours=3, preferred_level=1)
receipt = service.park(request)
print("Receipt:", receipt)
if receipt:
service.release(receipt.spot_id)
print("Inventory:", [(spot.spot_id, spot.state, spot.vehicle_id) for spot in repo.all()])
if __name__ == "__main__":
main()
Level 2 — Event-Driven Campus
Extension: coordinate multi-entry dispatch with an EventBus, round-robin floor strategy, and command-driven release flow.
from __future__ import annotations
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Optional, Protocol, Tuple
class EventBus:
def __init__(self) -> None:
self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
self._queue: Deque[Tuple[str, dict]] = deque()
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers[event].append(handler)
def publish(self, event: str, payload: dict) -> None:
self._queue.append((event, payload))
def pump(self) -> None:
while self._queue:
event, payload = self._queue.popleft()
for handler in list(self._subscribers.get(event, [])):
handler(payload)
class Command(Protocol):
def execute(self) -> None:
...
class PublishCommand:
def __init__(self, bus: EventBus, event: str, payload: dict) -> None:
self.bus = bus
self.event = event
self.payload = payload
def execute(self) -> None:
self.bus.publish(self.event, self.payload)
class SpotRepository(Protocol):
def available(self, *, level: Optional[int] = None) -> Iterable["Spot"]:
...
def get(self, spot_id: str) -> Optional["Spot"]:
...
def update(self, spot_id: str, spot: "Spot") -> None:
...
def levels(self) -> Iterable[int]:
...
@dataclass
class Spot:
spot_id: str
level: int
size: str
state: str = "VACANT"
vehicle_id: Optional[str] = None
@dataclass
class AllocationRequest:
vehicle_id: str
size: str
duration_hours: int
preferred_level: Optional[int] = None
@dataclass
class AllocationReceipt:
spot_id: str
fee: float
class BaseGuard:
def __init__(self) -> None:
self._next: Optional["BaseGuard"] = None
def set_next(self, nxt: "BaseGuard") -> "BaseGuard":
self._next = nxt
return nxt
def check(self, context: dict) -> Optional[str]:
return None
def handle(self, context: dict) -> Optional[str]:
failure = self.check(context)
if failure or not self._next:
return failure
return self._next.handle(context)
class VehicleSizeGuard(BaseGuard):
ORDER = {"compact": 0, "regular": 1, "large": 2}
def check(self, context: dict) -> Optional[str]:
request: AllocationRequest = context["request"]
spot: Spot = context["spot"]
try:
if self.ORDER[spot.size] < self.ORDER[request.size]:
return "SIZE_MISMATCH"
except KeyError:
return "UNKNOWN_SIZE"
return None
class SpotAvailabilityGuard(BaseGuard):
def check(self, context: dict) -> Optional[str]:
spot: Spot = context["spot"]
if spot.state != "VACANT":
return "TAKEN"
return None
class SpotLifecycle:
def __init__(self, spot: Spot) -> None:
self.spot = spot
def reserve(self) -> bool:
if self.spot.state != "VACANT":
return False
self.spot.state = "HELD"
return True
def occupy(self, vehicle_id: str) -> bool:
if self.spot.state != "HELD":
return False
self.spot.state = "OCCUPIED"
self.spot.vehicle_id = vehicle_id
return True
def vacate(self) -> bool:
if self.spot.state != "OCCUPIED":
return False
self.spot.state = "VACANT"
self.spot.vehicle_id = None
return True
class InMemorySpotRepository(SpotRepository):
def __init__(self, spots: Iterable[Spot]):
self._spots: Dict[str, Spot] = {spot.spot_id: spot for spot in spots}
def available(self, *, level: Optional[int] = None) -> Iterable[Spot]:
return [
spot
for spot in self._spots.values()
if spot.state == "VACANT" and (level is None or spot.level == level)
]
def get(self, spot_id: str) -> Optional[Spot]:
return self._spots.get(spot_id)
def update(self, spot_id: str, spot: Spot) -> None:
self._spots[spot_id] = spot
def levels(self) -> Iterable[int]:
return sorted({spot.level for spot in self._spots.values()})
class SpotSelector(Protocol):
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
...
class RoundRobinLevelSelector:
def __init__(self, levels: Iterable[int]):
self.order = list(levels)
self._cursor = 0
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
pools: DefaultDict[int, List[Spot]] = defaultdict(list)
for spot in candidates:
pools[spot.level].append(spot)
if not pools:
return None
if not self.order:
self.order = sorted(pools.keys())
for _ in range(len(self.order)):
level = self.order[self._cursor % len(self.order)]
self._cursor += 1
bucket = pools.get(level)
if bucket:
return bucket[0]
fallback_origin = request.preferred_level if request.preferred_level is not None else self.order[0]
fallback = sorted(
(spot for level_spots in pools.values() for spot in level_spots),
key=lambda s: abs(fallback_origin - s.level),
)
return fallback[0] if fallback else None
class PricingStrategy(Protocol):
def compute(self, request: AllocationRequest) -> float:
...
class FlatRatePricing(PricingStrategy):
RATES = {"compact": 5, "regular": 8, "large": 12}
def compute(self, request: AllocationRequest) -> float:
return self.RATES[request.size] * max(1, request.duration_hours)
class ParkingCampusService:
def __init__(self, repo: SpotRepository, selector: SpotSelector, pricing: PricingStrategy, bus: EventBus):
self.repo = repo
self.selector = selector
self.pricing = pricing
self.bus = bus
self.guard = VehicleSizeGuard()
self.guard.set_next(SpotAvailabilityGuard())
bus.subscribe("parking.requested", self._handle_request)
bus.subscribe("parking.checkout", self._handle_checkout)
def _handle_request(self, payload: dict) -> None:
request: AllocationRequest = payload["request"]
candidates = list(self.repo.available(level=request.preferred_level))
spot = self.selector.choose(request, candidates)
if not spot:
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "NO_SPOT"})
return
failure = self.guard.handle({"request": request, "spot": spot})
if failure:
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": failure})
return
lifecycle = SpotLifecycle(spot)
if not lifecycle.reserve() or not lifecycle.occupy(request.vehicle_id):
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "STATE"})
return
self.repo.update(spot.spot_id, spot)
fee = self.pricing.compute(request)
receipt = AllocationReceipt(spot_id=spot.spot_id, fee=fee)
self.bus.publish(
"parking.allocated",
{"vehicle": request.vehicle_id, "spot": spot.spot_id, "fee": fee, "gate": payload.get("gate"), "receipt": receipt},
)
def _handle_checkout(self, payload: dict) -> None:
spot = self.repo.get(payload["spot_id"])
if not spot:
return
lifecycle = SpotLifecycle(spot)
if lifecycle.vacate():
self.repo.update(spot.spot_id, spot)
self.bus.publish("parking.released", {"spot": spot.spot_id})
class EntryGate:
def __init__(self, gate_id: str, bus: EventBus) -> None:
self.gate_id = gate_id
self.bus = bus
def enqueue(self, request: AllocationRequest) -> Command:
payload = {"request": request, "gate": self.gate_id}
return PublishCommand(self.bus, "parking.requested", payload)
class ReleaseCommand(Command):
def __init__(self, bus: EventBus, spot_id: str):
self.bus = bus
self.spot_id = spot_id
def execute(self) -> None:
self.bus.publish("parking.checkout", {"spot_id": self.spot_id})
class AllocationProjector:
def __init__(self, bus: EventBus) -> None:
self.assignments: Dict[str, str] = {}
self.rejections: List[Tuple[str, str]] = []
bus.subscribe("parking.allocated", self._on_allocated)
bus.subscribe("parking.rejected", self._on_rejected)
bus.subscribe("parking.released", self._on_released)
def _on_allocated(self, payload: dict) -> None:
self.assignments[payload["vehicle"]] = payload["spot"]
def _on_rejected(self, payload: dict) -> None:
self.rejections.append((payload["vehicle"], payload["reason"]))
def _on_released(self, payload: dict) -> None:
released_spot = payload["spot"]
for vehicle, spot in list(self.assignments.items()):
if spot == released_spot:
del self.assignments[vehicle]
break
def main() -> None:
spots = [
Spot("C1", level=0, size="compact"),
Spot("C2", level=0, size="compact"),
Spot("R1", level=1, size="regular"),
Spot("R2", level=1, size="regular"),
Spot("L1", level=2, size="large"),
]
repo = InMemorySpotRepository(spots)
bus = EventBus()
selector = RoundRobinLevelSelector(repo.levels())
campus = ParkingCampusService(repo, selector, FlatRatePricing(), bus)
projector = AllocationProjector(bus)
gates = [EntryGate("G1", bus), EntryGate("G2", bus), EntryGate("G3", bus)]
requests = [
AllocationRequest(vehicle_id="CAR-1", size="regular", duration_hours=2, preferred_level=1),
AllocationRequest(vehicle_id="CAR-2", size="regular", duration_hours=1),
AllocationRequest(vehicle_id="BIKE-3", size="compact", duration_hours=3, preferred_level=0),
AllocationRequest(vehicle_id="SUV-4", size="large", duration_hours=4, preferred_level=2),
]
for gate, request in zip(gates * 2, requests):
gate.enqueue(request).execute()
bus.pump()
allocated = list(projector.assignments.items())
if allocated:
_, spot_id = allocated[0]
ReleaseCommand(bus, spot_id).execute()
bus.pump()
print("Assignments:", projector.assignments)
print("Rejections:", projector.rejections)
if __name__ == "__main__":
main()
Level 3 — Resilient Orchestration
Extension: drive the event-driven campus through a saga that layers payment retry, circuit breaker, and command-based fallbacks.
from __future__ import annotations
import random
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Optional, Protocol, Tuple
class EventBus:
def __init__(self) -> None:
self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
self._queue: Deque[Tuple[str, dict]] = deque()
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers[event].append(handler)
def publish(self, event: str, payload: dict) -> None:
self._queue.append((event, payload))
def pump(self) -> None:
while self._queue:
event, payload = self._queue.popleft()
for handler in list(self._subscribers.get(event, [])):
handler(payload)
class Command(Protocol):
def execute(self) -> None:
...
class PublishCommand:
def __init__(self, bus: EventBus, event: str, payload: dict) -> None:
self.bus = bus
self.event = event
self.payload = payload
def execute(self) -> None:
self.bus.publish(self.event, self.payload)
class ReleaseCommand(Command):
def __init__(self, bus: EventBus, spot_id: str):
self.bus = bus
self.spot_id = spot_id
def execute(self) -> None:
self.bus.publish("parking.checkout", {"spot_id": self.spot_id})
class SpotRepository(Protocol):
def available(self, *, level: Optional[int] = None) -> Iterable["Spot"]:
...
def get(self, spot_id: str) -> Optional["Spot"]:
...
def update(self, spot_id: str, spot: "Spot") -> None:
...
def levels(self) -> Iterable[int]:
...
@dataclass
class Spot:
spot_id: str
level: int
size: str
state: str = "VACANT"
vehicle_id: Optional[str] = None
@dataclass
class AllocationRequest:
vehicle_id: str
size: str
duration_hours: int
preferred_level: Optional[int] = None
@dataclass
class AllocationReceipt:
spot_id: str
fee: float
class BaseGuard:
def __init__(self) -> None:
self._next: Optional["BaseGuard"] = None
def set_next(self, nxt: "BaseGuard") -> "BaseGuard":
self._next = nxt
return nxt
def check(self, context: dict) -> Optional[str]:
return None
def handle(self, context: dict) -> Optional[str]:
failure = self.check(context)
if failure or not self._next:
return failure
return self._next.handle(context)
class VehicleSizeGuard(BaseGuard):
ORDER = {"compact": 0, "regular": 1, "large": 2}
def check(self, context: dict) -> Optional[str]:
request: AllocationRequest = context["request"]
spot: Spot = context["spot"]
try:
if self.ORDER[spot.size] < self.ORDER[request.size]:
return "SIZE_MISMATCH"
except KeyError:
return "UNKNOWN_SIZE"
return None
class SpotAvailabilityGuard(BaseGuard):
def check(self, context: dict) -> Optional[str]:
spot: Spot = context["spot"]
if spot.state != "VACANT":
return "TAKEN"
return None
class SpotLifecycle:
def __init__(self, spot: Spot) -> None:
self.spot = spot
def reserve(self) -> bool:
if self.spot.state != "VACANT":
return False
self.spot.state = "HELD"
return True
def occupy(self, vehicle_id: str) -> bool:
if self.spot.state != "HELD":
return False
self.spot.state = "OCCUPIED"
self.spot.vehicle_id = vehicle_id
return True
def vacate(self) -> bool:
if self.spot.state != "OCCUPIED":
return False
self.spot.state = "VACANT"
self.spot.vehicle_id = None
return True
class InMemorySpotRepository(SpotRepository):
def __init__(self, spots: Iterable[Spot]):
self._spots: Dict[str, Spot] = {spot.spot_id: spot for spot in spots}
def available(self, *, level: Optional[int] = None) -> Iterable[Spot]:
return [
spot
for spot in self._spots.values()
if spot.state == "VACANT" and (level is None or spot.level == level)
]
def get(self, spot_id: str) -> Optional[Spot]:
return self._spots.get(spot_id)
def update(self, spot_id: str, spot: Spot) -> None:
self._spots[spot_id] = spot
def levels(self) -> Iterable[int]:
return sorted({spot.level for spot in self._spots.values()})
class SpotSelector(Protocol):
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
...
class RoundRobinLevelSelector:
def __init__(self, levels: Iterable[int]):
self.order = list(levels)
self._cursor = 0
def choose(self, request: AllocationRequest, candidates: Iterable[Spot]) -> Optional[Spot]:
pools: DefaultDict[int, List[Spot]] = defaultdict(list)
for spot in candidates:
pools[spot.level].append(spot)
if not pools:
return None
if not self.order:
self.order = sorted(pools.keys())
for _ in range(len(self.order)):
level = self.order[self._cursor % len(self.order)]
self._cursor += 1
bucket = pools.get(level)
if bucket:
return bucket[0]
fallback_origin = request.preferred_level if request.preferred_level is not None else self.order[0]
fallback = sorted(
(spot for level_spots in pools.values() for spot in level_spots),
key=lambda s: abs(fallback_origin - s.level),
)
return fallback[0] if fallback else None
class PricingStrategy(Protocol):
def compute(self, request: AllocationRequest) -> float:
...
class FlatRatePricing(PricingStrategy):
RATES = {"compact": 5, "regular": 8, "large": 12}
def compute(self, request: AllocationRequest) -> float:
return self.RATES[request.size] * max(1, request.duration_hours)
class ParkingCampusService:
def __init__(self, repo: SpotRepository, selector: SpotSelector, pricing: PricingStrategy, bus: EventBus):
self.repo = repo
self.selector = selector
self.pricing = pricing
self.bus = bus
self.guard = VehicleSizeGuard()
self.guard.set_next(SpotAvailabilityGuard())
bus.subscribe("parking.requested", self._handle_request)
bus.subscribe("parking.checkout", self._handle_checkout)
def _handle_request(self, payload: dict) -> None:
request: AllocationRequest = payload["request"]
candidates = list(self.repo.available(level=request.preferred_level))
spot = self.selector.choose(request, candidates)
if not spot:
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "NO_SPOT"})
return
failure = self.guard.handle({"request": request, "spot": spot})
if failure:
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": failure})
return
lifecycle = SpotLifecycle(spot)
if not lifecycle.reserve() or not lifecycle.occupy(request.vehicle_id):
self.bus.publish("parking.rejected", {"vehicle": request.vehicle_id, "reason": "STATE"})
return
self.repo.update(spot.spot_id, spot)
fee = self.pricing.compute(request)
receipt = AllocationReceipt(spot_id=spot.spot_id, fee=fee)
self.bus.publish(
"parking.allocated",
{"vehicle": request.vehicle_id, "spot": spot.spot_id, "fee": fee, "gate": payload.get("gate"), "receipt": receipt},
)
def _handle_checkout(self, payload: dict) -> None:
spot = self.repo.get(payload["spot_id"])
if not spot:
return
lifecycle = SpotLifecycle(spot)
if lifecycle.vacate():
self.repo.update(spot.spot_id, spot)
self.bus.publish("parking.released", {"spot": spot.spot_id})
class PaymentGateway:
def __init__(self, failure_rate: float = 0.35):
self.failure_rate = failure_rate
def charge(self, vehicle_id: str, amount: float) -> str:
if random.random() < self.failure_rate:
raise RuntimeError("payment timeout")
return f"receipt::{vehicle_id}::{int(time.time() * 1000)}::{amount:.2f}"
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = "CLOSED"
def call(self, func: Callable[[], str]) -> str:
now = time.time()
if self.state == "OPEN":
if now - self.last_failure_time < self.recovery_timeout:
raise RuntimeError("Circuit open")
self.state = "HALF_OPEN"
try:
result = func()
except Exception:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
else:
self.failure_count = 0
self.state = "CLOSED"
return result
class RetryPolicy:
def __init__(self, attempts: int, backoff: float):
self.attempts = attempts
self.backoff = backoff
def call(self, func: Callable[[], str]) -> str:
last_error: Optional[Exception] = None
for attempt in range(self.attempts):
try:
return func()
except Exception as exc:
last_error = exc
time.sleep(self.backoff * (attempt + 1))
if last_error:
raise last_error
raise RuntimeError("retry exhausted")
class PaymentProcessor:
def __init__(self, bus: EventBus, gateway: PaymentGateway, breaker: CircuitBreaker, retry: RetryPolicy):
self.bus = bus
self.gateway = gateway
self.breaker = breaker
self.retry = retry
bus.subscribe("parking.allocated", self._handle_allocated)
def _handle_allocated(self, payload: dict) -> None:
vehicle = payload["vehicle"]
spot_id = payload["spot"]
amount = payload["fee"]
def attempt() -> str:
return self.breaker.call(lambda: self.gateway.charge(vehicle, amount))
try:
receipt = self.retry.call(attempt)
self.bus.publish("parking.confirmed", {"vehicle": vehicle, "spot": spot_id, "receipt": receipt})
except Exception as exc:
self.bus.publish("parking.payment_failed", {"vehicle": vehicle, "spot": spot_id, "reason": str(exc)})
class ManualTicketCommand(Command):
def __init__(self, vehicle_id: str, spot_id: str):
self.vehicle_id = vehicle_id
self.spot_id = spot_id
self.ticket: Optional[str] = None
def execute(self) -> None:
self.ticket = f"manual-ticket::{self.vehicle_id}@{self.spot_id}"
class ParkingSaga:
def __init__(self, bus: EventBus):
self.bus = bus
bus.subscribe("parking.payment_failed", self._compensate)
def _compensate(self, payload: dict) -> None:
vehicle = payload["vehicle"]
spot_id = payload["spot"]
ticket_cmd = ManualTicketCommand(vehicle, spot_id)
ticket_cmd.execute()
ReleaseCommand(self.bus, spot_id).execute()
self.bus.publish(
"parking.fallback",
{"vehicle": vehicle, "spot": spot_id, "ticket": ticket_cmd.ticket, "reason": payload["reason"]},
)
class EntryGate:
def __init__(self, gate_id: str, bus: EventBus) -> None:
self.gate_id = gate_id
self.bus = bus
def enqueue(self, request: AllocationRequest) -> Command:
payload = {"request": request, "gate": self.gate_id}
return PublishCommand(self.bus, "parking.requested", payload)
class AllocationProjection:
def __init__(self, bus: EventBus) -> None:
self.pending: Dict[str, str] = {}
self.confirmed: Dict[str, str] = {}
self.fallbacks: Dict[str, str] = {}
self.rejections: List[Tuple[str, str]] = []
bus.subscribe("parking.allocated", self._on_allocated)
bus.subscribe("parking.confirmed", self._on_confirmed)
bus.subscribe("parking.fallback", self._on_fallback)
bus.subscribe("parking.released", self._on_released)
bus.subscribe("parking.rejected", self._on_rejected)
def _on_allocated(self, payload: dict) -> None:
self.pending[payload["vehicle"]] = payload["spot"]
def _on_confirmed(self, payload: dict) -> None:
vehicle = payload["vehicle"]
spot = payload["spot"]
self.confirmed[vehicle] = spot
self.pending.pop(vehicle, None)
def _on_fallback(self, payload: dict) -> None:
vehicle = payload["vehicle"]
self.fallbacks[vehicle] = payload["ticket"]
self.pending.pop(vehicle, None)
def _on_released(self, payload: dict) -> None:
released_spot = payload["spot"]
for bucket in (self.pending, self.confirmed):
for vehicle, spot in list(bucket.items()):
if spot == released_spot:
del bucket[vehicle]
def _on_rejected(self, payload: dict) -> None:
self.rejections.append((payload["vehicle"], payload["reason"]))
class ParkingMetrics:
def __init__(self, bus: EventBus) -> None:
self.snapshot: Dict[str, int] = {"confirmed": 0, "fallback": 0, "rejected": 0}
bus.subscribe("parking.confirmed", self._on_confirmed)
bus.subscribe("parking.fallback", self._on_fallback)
bus.subscribe("parking.rejected", self._on_rejected)
def _on_confirmed(self, _: dict) -> None:
self.snapshot["confirmed"] += 1
def _on_fallback(self, _: dict) -> None:
self.snapshot["fallback"] += 1
def _on_rejected(self, _: dict) -> None:
self.snapshot["rejected"] += 1
def main() -> None:
random.seed(42)
spots = [
Spot("C1", level=0, size="compact"),
Spot("R1", level=1, size="regular"),
Spot("R2", level=1, size="regular"),
Spot("L1", level=2, size="large"),
Spot("L2", level=2, size="large"),
]
repo = InMemorySpotRepository(spots)
bus = EventBus()
selector = RoundRobinLevelSelector(repo.levels())
pricing = FlatRatePricing()
ParkingCampusService(repo, selector, pricing, bus)
PaymentProcessor(bus, PaymentGateway(failure_rate=0.4), CircuitBreaker(3, 1.5), RetryPolicy(3, 0.05))
ParkingSaga(bus)
projection = AllocationProjection(bus)
metrics = ParkingMetrics(bus)
gates = [EntryGate("Gate-A", bus), EntryGate("Gate-B", bus)]
requests = [
AllocationRequest("CAR-101", "regular", 2, preferred_level=1),
AllocationRequest("SUV-202", "large", 4, preferred_level=2),
AllocationRequest("BIKE-303", "compact", 1, preferred_level=0),
AllocationRequest("CAR-404", "regular", 3),
AllocationRequest("SUV-505", "large", 5, preferred_level=2),
AllocationRequest("TRUCK-606", "large", 2),
]
for gate, request in zip(gates * 3, requests):
gate.enqueue(request).execute()
bus.pump()
confirmed_snapshot = dict(projection.confirmed)
for _, spot in list(projection.confirmed.items())[:1]:
ReleaseCommand(bus, spot).execute()
bus.pump()
print("Confirmed:", confirmed_snapshot)
print("Fallback:", projection.fallbacks)
print("Pending:", projection.pending)
print("Rejections:", projection.rejections)
print("Metrics:", metrics.snapshot)
if __name__ == "__main__":
main()
Incrementally deliver a food ordering platform: start with observer-driven notifications, scale to prioritized concurrent dispatch, and finally orchestrate resilient fulfillment with sagas and circuit breakers.
Food Delivery Platform ├─ Level 1: OrderService → Observers ├─ Level 2: Priority Queue Dispatchers → Partner Utilization └─ Level 3: Saga Orchestrator → Payment Breaker → Courier Assignment
Level 1 — Core Ordering Flow
Focus: order life-cycle with observer notifications and in-memory repository.
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Protocol
class DeliveryObserver(Protocol):
def notify(self, order_id: str, details: str) -> None:
...
@dataclass
class DeliveryPartner:
partner_id: str
notifications: List[str] = field(default_factory=list)
def notify(self, order_id: str, details: str) -> None:
message = f"Order {order_id}: {details}"
self.notifications.append(message)
print(f"Notify {self.partner_id}: {message}")
@dataclass
class Order:
order_id: str
restaurant: str
status: str = "PLACED"
class OrderRepository:
def __init__(self):
self.orders: Dict[str, Order] = {}
def save(self, order: Order) -> None:
self.orders[order.order_id] = order
def update_status(self, order_id: str, status: str) -> None:
self.orders[order_id].status = status
class OrderService:
def __init__(self, repository: OrderRepository):
self.repo = repository
self.observers: List[DeliveryObserver] = []
def register_partner(self, partner: DeliveryObserver) -> None:
self.observers.append(partner)
def place_order(self, order_id: str, restaurant: str) -> Order:
order = Order(order_id, restaurant)
self.repo.save(order)
for observer in self.observers:
observer.notify(order_id, f"from {restaurant} awaiting pickup")
return order
def mark_picked(self, order_id: str) -> None:
self.repo.update_status(order_id, "PICKED")
for observer in self.observers:
observer.notify(order_id, "picked up")
if __name__ == "__main__":
repo = OrderRepository()
service = OrderService(repo)
p1, p2 = DeliveryPartner("DP1"), DeliveryPartner("DP2")
service.register_partner(p1)
service.register_partner(p2)
service.place_order("O1", "Spicy Bites")
service.mark_picked("O1")
Level 2 — Concurrent Dispatch
Extension: reuse the order service with a priority dispatch coordinator and concurrent partner balancing.
from __future__ import annotations
import heapq
import threading
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Optional, Protocol
class DeliveryObserver(Protocol):
def notify(self, order_id: str, details: str) -> None:
...
@dataclass
class DeliveryPartner:
partner_id: str
notifications: List[str] = field(default_factory=list)
def notify(self, order_id: str, details: str) -> None:
message = f"Order {order_id}: {details}"
self.notifications.append(message)
print(f"Notify {self.partner_id}: {message}")
@dataclass
class Order:
order_id: str
restaurant: str
priority: int = 1
status: str = "PLACED"
@dataclass(order=True)
class DispatchItem:
sort_key: int
created_at: float
order: Order = field(compare=False)
class OrderRepository:
def __init__(self) -> None:
self.orders: Dict[str, Order] = {}
def save(self, order: Order) -> None:
self.orders[order.order_id] = order
def update_status(self, order_id: str, status: str) -> None:
self.orders[order_id].status = status
def get(self, order_id: str) -> Order:
return self.orders[order_id]
class OrderService:
def __init__(self, repository: OrderRepository) -> None:
self.repo = repository
self.observers: List[DeliveryObserver] = []
self.dispatcher: Optional["PriorityDispatchCoordinator"] = None
def register_partner(self, partner: DeliveryObserver) -> None:
self.observers.append(partner)
def attach_dispatcher(self, dispatcher: "PriorityDispatchCoordinator") -> None:
self.dispatcher = dispatcher
def place_order(self, order_id: str, restaurant: str, priority: int = 1) -> Order:
order = Order(order_id, restaurant, priority)
self.repo.save(order)
for observer in self.observers:
observer.notify(order_id, f"from {restaurant} awaiting pickup")
if self.dispatcher:
self.dispatcher.enqueue(order)
return order
def mark_picked(self, order_id: str) -> None:
self.repo.update_status(order_id, "PICKED")
for observer in self.observers:
observer.notify(order_id, "picked up")
if self.dispatcher:
self.dispatcher.complete(order_id)
def cancel_order(self, order_id: str, reason: str = "CANCELLED") -> None:
self.repo.update_status(order_id, reason)
for observer in self.observers:
observer.notify(order_id, f"cancelled: {reason}")
if self.dispatcher:
self.dispatcher.complete(order_id)
class PriorityDispatchCoordinator:
def __init__(self, service: OrderService, partners: Dict[str, DeliveryPartner], worker_count: int = 3):
self.service = service
self.partners = partners
self.partner_load: Dict[str, int] = {pid: 0 for pid in partners}
self.queue: List[DispatchItem] = []
self.queue_cond = threading.Condition()
self.load_lock = threading.Lock()
self.assignments: Dict[str, str] = {}
self.running = True
self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(worker_count)]
for worker in self.workers:
worker.start()
def enqueue(self, order: Order) -> None:
with self.queue_cond:
sort_key = -order.priority
heapq.heappush(self.queue, DispatchItem(sort_key, time.time(), order))
self.queue_cond.notify()
def _pick_partner(self) -> DeliveryPartner:
with self.load_lock:
partner_id = min(self.partner_load, key=self.partner_load.get)
self.partner_load[partner_id] += 1
return self.partners[partner_id]
def complete(self, order_id: str) -> None:
partner_id = self.assignments.pop(order_id, None)
if partner_id:
with self.load_lock:
self.partner_load[partner_id] -= 1
def _worker(self) -> None:
while self.running:
with self.queue_cond:
while not self.queue and self.running:
self.queue_cond.wait()
if not self.running:
return
item = heapq.heappop(self.queue)
partner = self._pick_partner()
self.assignments[item.order.order_id] = partner.partner_id
partner.notify(item.order.order_id, f"pickup ready from {item.order.restaurant}")
time.sleep(0.1)
def shutdown(self) -> None:
self.running = False
with self.queue_cond:
self.queue_cond.notify_all()
for worker in self.workers:
worker.join(timeout=0.2)
def main() -> None:
repo = OrderRepository()
service = OrderService(repo)
partners = {
"DP1": DeliveryPartner("DP1"),
"DP2": DeliveryPartner("DP2"),
"DP3": DeliveryPartner("DP3"),
}
for partner in partners.values():
service.register_partner(partner)
dispatcher = PriorityDispatchCoordinator(service, partners, worker_count=2)
service.attach_dispatcher(dispatcher)
for priority, order_id in enumerate(["O1", "O2", "O3", "O4"], start=1):
service.place_order(order_id, "Fusion Kitchen", priority)
time.sleep(0.4)
for order_id in ["O1", "O2", "O3", "O4"]:
service.mark_picked(order_id)
dispatcher.shutdown()
if __name__ == "__main__":
main()
Level 3 — Resilient Fulfillment
Extension: orchestrate the Level 2 dispatcher with saga, retries, and circuit breaker protection around payment and courier assignment.
from __future__ import annotations
import heapq
import random
import threading
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Callable, Deque, Dict, List, Optional, Protocol
class DeliveryObserver(Protocol):
def notify(self, order_id: str, details: str) -> None:
...
@dataclass
class DeliveryPartner:
partner_id: str
notifications: List[str] = field(default_factory=list)
def notify(self, order_id: str, details: str) -> None:
message = f"Order {order_id}: {details}"
self.notifications.append(message)
print(f"Notify {self.partner_id}: {message}")
@dataclass
class Order:
order_id: str
restaurant: str
priority: int = 1
status: str = "PLACED"
@dataclass(order=True)
class DispatchItem:
sort_key: int
created_at: float
order: Order = field(compare=False)
class OrderRepository:
def __init__(self) -> None:
self.orders: Dict[str, Order] = {}
def save(self, order: Order) -> None:
self.orders[order.order_id] = order
def update_status(self, order_id: str, status: str) -> None:
self.orders[order_id].status = status
def get(self, order_id: str) -> Order:
return self.orders[order_id]
class OrderService:
def __init__(self, repository: OrderRepository) -> None:
self.repo = repository
self.observers: List[DeliveryObserver] = []
self.dispatcher: Optional["PriorityDispatchCoordinator"] = None
def register_partner(self, partner: DeliveryObserver) -> None:
self.observers.append(partner)
def attach_dispatcher(self, dispatcher: "PriorityDispatchCoordinator") -> None:
self.dispatcher = dispatcher
def place_order(self, order_id: str, restaurant: str, priority: int = 1) -> Order:
order = Order(order_id, restaurant, priority)
self.repo.save(order)
for observer in self.observers:
observer.notify(order_id, f"from {restaurant} awaiting pickup")
if self.dispatcher:
self.dispatcher.enqueue(order)
return order
def mark_picked(self, order_id: str) -> None:
self.repo.update_status(order_id, "PICKED")
for observer in self.observers:
observer.notify(order_id, "picked up")
if self.dispatcher:
self.dispatcher.complete(order_id)
def cancel_order(self, order_id: str, reason: str = "CANCELLED") -> None:
self.repo.update_status(order_id, reason)
for observer in self.observers:
observer.notify(order_id, f"cancelled: {reason}")
if self.dispatcher:
self.dispatcher.complete(order_id)
class PriorityDispatchCoordinator:
def __init__(self, service: OrderService, partners: Dict[str, DeliveryPartner], worker_count: int = 3):
self.service = service
self.partners = partners
self.partner_load: Dict[str, int] = {pid: 0 for pid in partners}
self.queue: List[DispatchItem] = []
self.queue_cond = threading.Condition()
self.load_lock = threading.Lock()
self.assignments: Dict[str, str] = {}
self.running = True
self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(worker_count)]
for worker in self.workers:
worker.start()
def enqueue(self, order: Order) -> None:
with self.queue_cond:
sort_key = -order.priority
heapq.heappush(self.queue, DispatchItem(sort_key, time.time(), order))
self.queue_cond.notify()
def _pick_partner(self) -> DeliveryPartner:
with self.load_lock:
partner_id = min(self.partner_load, key=self.partner_load.get)
self.partner_load[partner_id] += 1
return self.partners[partner_id]
def complete(self, order_id: str) -> None:
partner_id = self.assignments.pop(order_id, None)
if partner_id:
with self.load_lock:
self.partner_load[partner_id] -= 1
def _worker(self) -> None:
while self.running:
with self.queue_cond:
while not self.queue and self.running:
self.queue_cond.wait()
if not self.running:
return
item = heapq.heappop(self.queue)
partner = self._pick_partner()
self.assignments[item.order.order_id] = partner.partner_id
partner.notify(item.order.order_id, f"pickup ready from {item.order.restaurant}")
time.sleep(0.1)
def shutdown(self) -> None:
self.running = False
with self.queue_cond:
self.queue_cond.notify_all()
for worker in self.workers:
worker.join(timeout=0.2)
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def call(self, func: Callable[[], str]) -> str:
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("Payment service unavailable (circuit open)")
try:
result = func()
except Exception:
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
else:
with self.lock:
self.failure_count = 0
self.state = "CLOSED"
return result
class RetryPolicy:
def __init__(self, attempts: int, base_delay: float):
self.attempts = attempts
self.base_delay = base_delay
def call(self, func: Callable[[], str]) -> str:
last_err: Optional[Exception] = None
for attempt in range(self.attempts):
try:
return func()
except Exception as exc:
last_err = exc
time.sleep(self.base_delay * (2 ** attempt))
raise last_err if last_err else RuntimeError("Unknown failure")
class SagaStep:
def __init__(self, action: Callable[[], str], compensate: Callable[[], None]):
self.action = action
self.compensate = compensate
class SagaOrchestrator:
def __init__(self, steps: List[SagaStep]):
self.steps = steps
def execute(self) -> str:
executed: List[SagaStep] = []
try:
for step in self.steps:
result = step.action()
executed.append(step)
print(result)
return "SAGA_COMPLETED"
except Exception as exc:
print(f"Saga failed: {exc}. Rolling back…")
for step in reversed(executed):
step.compensate()
return "SAGA_ROLLED_BACK"
class PaymentGateway:
def __init__(self, failure_rate: float = 0.5):
self.failure_rate = failure_rate
def charge(self, amount: float) -> str:
if random.random() < self.failure_rate:
raise RuntimeError("Payment declined")
return f"receipt-{int(time.time() * 1000)}"
class CourierPool:
def __init__(self, couriers: List[str]):
self.available: Deque[str] = deque(couriers)
self.lock = threading.Lock()
def acquire(self) -> str:
with self.lock:
if not self.available:
raise RuntimeError("No courier available")
return self.available.popleft()
def release(self, courier_id: str) -> None:
with self.lock:
self.available.append(courier_id)
@dataclass
class FulfillmentContext:
order_id: str
restaurant_reserved: bool = False
courier_id: Optional[str] = None
payment_receipt: Optional[str] = None
class ResilientFoodCoordinator:
def __init__(self,
service: OrderService,
dispatcher: PriorityDispatchCoordinator,
payment_gateway: PaymentGateway,
breaker: CircuitBreaker,
retry_policy: RetryPolicy,
couriers: CourierPool):
self.service = service
self.dispatcher = dispatcher
self.payment_gateway = payment_gateway
self.breaker = breaker
self.retry_policy = retry_policy
self.couriers = couriers
self.metrics_lock = threading.Lock()
self.metrics = {"success": 0, "rolled_back": 0}
self.dead_letter: List[str] = []
def fulfill(self, order_id: str, restaurant: str, priority: int, amount: float) -> str:
order = self.service.place_order(order_id, restaurant, priority)
ctx = FulfillmentContext(order_id=order_id)
def reserve_restaurant() -> str:
ctx.restaurant_reserved = True
return f"Restaurant reserved for {order_id}"
def cancel_restaurant() -> None:
if ctx.restaurant_reserved:
print(f"Compensate: release restaurant for {order_id}")
ctx.restaurant_reserved = False
def charge_payment() -> str:
receipt = self.retry_policy.call(
lambda: self.breaker.call(lambda: self.payment_gateway.charge(amount))
)
ctx.payment_receipt = receipt
return f"Payment captured {receipt}"
def refund_payment() -> None:
if ctx.payment_receipt:
print(f"Compensate: refund {ctx.payment_receipt} for {order_id}")
ctx.payment_receipt = None
def assign_courier() -> str:
courier_id = self.couriers.acquire()
ctx.courier_id = courier_id
return f"Courier {courier_id} assigned to {order_id}"
def release_courier() -> None:
if ctx.courier_id:
print(f"Compensate: release courier {ctx.courier_id} for {order_id}")
self.couriers.release(ctx.courier_id)
ctx.courier_id = None
saga = SagaOrchestrator([
SagaStep(reserve_restaurant, cancel_restaurant),
SagaStep(charge_payment, refund_payment),
SagaStep(assign_courier, release_courier),
])
outcome = saga.execute()
if outcome == "SAGA_COMPLETED":
self.service.mark_picked(order_id)
if ctx.courier_id:
self.couriers.release(ctx.courier_id)
ctx.courier_id = None
with self.metrics_lock:
self.metrics["success"] += 1
else:
self.service.cancel_order(order_id, "FAILED")
self.dead_letter.append(order_id)
with self.metrics_lock:
self.metrics["rolled_back"] += 1
return outcome
def snapshot(self) -> Dict[str, int]:
with self.metrics_lock:
return dict(self.metrics)
def main() -> None:
random.seed(9)
repo = OrderRepository()
service = OrderService(repo)
partners = {
"DP1": DeliveryPartner("DP1"),
"DP2": DeliveryPartner("DP2"),
}
for partner in partners.values():
service.register_partner(partner)
dispatcher = PriorityDispatchCoordinator(service, partners, worker_count=2)
service.attach_dispatcher(dispatcher)
coordinator = ResilientFoodCoordinator(
service=service,
dispatcher=dispatcher,
payment_gateway=PaymentGateway(failure_rate=0.4),
breaker=CircuitBreaker(failure_threshold=2, recovery_timeout=0.5),
retry_policy=RetryPolicy(attempts=3, base_delay=0.05),
couriers=CourierPool(["C1", "C2"])
)
orders = [("FO-101", "Spicy Bites", 3, 25.0), ("FO-102", "Fusion Kitchen", 1, 18.0), ("FO-103", "Veg Delight", 2, 22.0)]
for order_id, restaurant, priority, amount in orders:
outcome = coordinator.fulfill(order_id, restaurant, priority, amount)
print(f"Outcome for {order_id}: {outcome}")
time.sleep(0.1)
print("Metrics:", coordinator.snapshot())
if coordinator.dead_letter:
print("Dead letters:", coordinator.dead_letter)
dispatcher.shutdown()
if __name__ == "__main__":
main()
Evolve the ride sharing platform from nearest-driver matching to surge-aware concurrent dispatch and finally wrap it with resilience patterns for mobility services.
Ride Sharing Platform ├─ Level 1: Matching Strategy → Driver Repository ├─ Level 2: Zone Locks → Surge Manager └─ Level 3: Resilient Dispatcher → Cache Fallback
Level 1 — Matching Engine
Focus: nearest driver selection with strategy-driven repository.
from dataclasses import dataclass
from typing import Dict, Protocol, Tuple, Optional
import math
@dataclass
class Driver:
driver_id: str
location: Tuple[float, float]
available: bool = True
zone: str = "default"
class MatchStrategy(Protocol):
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
...
class NearestMatch(MatchStrategy):
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
best_driver = None
best_distance = float('inf')
rx, ry = rider_loc
for driver in drivers.values():
if not driver.available:
continue
dx, dy = driver.location
dist = math.hypot(rx - dx, ry - dy)
if dist < best_distance:
best_distance, best_driver = dist, driver
return best_driver
class RideSharingService:
def __init__(self, strategy: MatchStrategy):
self.strategy = strategy
self.drivers: Dict[str, Driver] = {}
self.assignments: Dict[str, str] = {}
def add_driver(self, driver: Driver) -> None:
self.drivers[driver.driver_id] = driver
def request_ride(self, rider_id: str, location: Tuple[float, float]) -> str:
driver = self.strategy.pick(self.drivers, location)
if not driver:
raise RuntimeError('No drivers available')
driver.available = False
self.assignments[rider_id] = driver.driver_id
return driver.driver_id
def complete_ride(self, rider_id: str) -> None:
driver_id = self.assignments.pop(rider_id)
self.drivers[driver_id].available = True
def cancel_ride(self, rider_id: str) -> None:
driver_id = self.assignments.pop(rider_id, None)
if driver_id:
self.drivers[driver_id].available = True
if __name__ == "__main__":
service = RideSharingService(NearestMatch())
service.add_driver(Driver('D1', (12.9, 77.6), zone="north"))
service.add_driver(Driver('D2', (12.95, 77.58), zone="south"))
assigned = service.request_ride('R1', (12.92, 77.59))
print('Assigned driver:', assigned)
service.complete_ride('R1')
Level 2 — Concurrent Surge Dispatch
Extension: build on the core matcher with per-zone locks and surge multipliers to handle concurrent ride requests.
from __future__ import annotations
import math
import threading
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, Optional
@dataclass
class Driver:
driver_id: str
location: Tuple[float, float]
available: bool = True
zone: str = "default"
class MatchStrategy:
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
raise NotImplementedError
class NearestMatch(MatchStrategy):
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
best_driver = None
best_distance = float("inf")
rx, ry = rider_loc
for driver in drivers.values():
if not driver.available:
continue
dx, dy = driver.location
dist = math.hypot(rx - dx, ry - dy)
if dist < best_distance:
best_distance, best_driver = dist, driver
return best_driver
class RideSharingService:
def __init__(self, strategy: MatchStrategy):
self.strategy = strategy
self.drivers: Dict[str, Driver] = {}
self.assignments: Dict[str, str] = {}
def add_driver(self, driver: Driver) -> None:
self.drivers[driver.driver_id] = driver
def request_ride(self, rider_id: str, location: Tuple[float, float]) -> str:
driver = self.strategy.pick(self.drivers, location)
if not driver:
raise RuntimeError("No drivers available")
driver.available = False
self.assignments[rider_id] = driver.driver_id
return driver.driver_id
def complete_ride(self, rider_id: str) -> None:
driver_id = self.assignments.pop(rider_id)
self.drivers[driver_id].available = True
class ZoneLockManager:
def __init__(self) -> None:
self._locks: Dict[str, threading.RLock] = defaultdict(threading.RLock)
def lock(self, zone: str) -> threading.RLock:
return self._locks[zone]
class SurgeManager:
def __init__(self, base_multiplier: float = 1.0, step: float = 0.2) -> None:
self.base_multiplier = base_multiplier
self.step = step
self.multipliers: Dict[str, float] = defaultdict(lambda: self.base_multiplier)
self.lock = threading.Lock()
def bump(self, zone: str) -> float:
with self.lock:
self.multipliers[zone] += self.step
return round(self.multipliers[zone], 2)
def relax(self, zone: str) -> float:
with self.lock:
self.multipliers[zone] = max(self.base_multiplier, self.multipliers[zone] - self.step)
return round(self.multipliers[zone], 2)
def current(self, zone: str) -> float:
with self.lock:
return round(self.multipliers[zone], 2)
class ConcurrentRideSharingService(RideSharingService):
def __init__(self, strategy: MatchStrategy, zone_locks: ZoneLockManager, surge: SurgeManager):
super().__init__(strategy)
self.zone_locks = zone_locks
self.surge = surge
self.assignment_zones: Dict[str, str] = {}
self.metrics_lock = threading.Lock()
self.metrics = {"completed": 0, "contention": 0}
def request_ride(self, rider_id: str, location: Tuple[float, float], zone: str) -> Tuple[str, float]:
lock = self.zone_locks.lock(zone)
acquired = lock.acquire(timeout=0.1)
if not acquired:
with self.metrics_lock:
self.metrics["contention"] += 1
lock.acquire()
try:
driver_id = super().request_ride(rider_id, location)
self.assignment_zones[rider_id] = zone
surge_multiplier = self.surge.bump(zone)
return driver_id, surge_multiplier
finally:
lock.release()
def complete_ride(self, rider_id: str) -> None:
zone = self.assignment_zones.pop(rider_id, "default")
super().complete_ride(rider_id)
self.surge.relax(zone)
with self.metrics_lock:
self.metrics["completed"] += 1
def snapshot(self) -> Dict[str, int]:
with self.metrics_lock:
return dict(self.metrics)
def main() -> None:
zone_locks = ZoneLockManager()
surge = SurgeManager()
service = ConcurrentRideSharingService(NearestMatch(), zone_locks, surge)
service.add_driver(Driver("D1", (12.9, 77.6), zone="central"))
service.add_driver(Driver("D2", (12.91, 77.61), zone="central"))
service.add_driver(Driver("D3", (12.95, 77.58), zone="north"))
def rider_task(rider_id: str) -> None:
try:
driver_id, surge_multiplier = service.request_ride(rider_id, (12.9, 77.6), "central")
print(f"{rider_id} → {driver_id} at surge {surge_multiplier}x")
time.sleep(0.05)
service.complete_ride(rider_id)
except RuntimeError as exc:
print(f"{rider_id} failed: {exc}")
threads = [threading.Thread(target=rider_task, args=(f"R{i}",)) for i in range(1, 6)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Metrics:", service.snapshot())
print("Current surge (central):", surge.current("central"))
if __name__ == "__main__":
main()
Level 3 — Resilient Mobility
Extension: resilient dispatch with cache fallback for outages.
from __future__ import annotations
import math
import random
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Deque, Dict, Optional, Tuple
@dataclass
class Driver:
driver_id: str
location: Tuple[float, float]
available: bool = True
zone: str = "default"
class MatchStrategy:
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
raise NotImplementedError
class NearestMatch(MatchStrategy):
def pick(self, drivers: Dict[str, Driver], rider_loc: Tuple[float, float]) -> Optional[Driver]:
best_driver = None
best_distance = float("inf")
rx, ry = rider_loc
for driver in drivers.values():
if not driver.available:
continue
dx, dy = driver.location
dist = math.hypot(rx - dx, ry - dy)
if dist < best_distance:
best_distance, best_driver = dist, driver
return best_driver
class RideSharingService:
def __init__(self, strategy: MatchStrategy):
self.strategy = strategy
self.drivers: Dict[str, Driver] = {}
self.assignments: Dict[str, str] = {}
def add_driver(self, driver: Driver) -> None:
self.drivers[driver.driver_id] = driver
def request_ride(self, rider_id: str, location: Tuple[float, float]) -> str:
driver = self.strategy.pick(self.drivers, location)
if not driver:
raise RuntimeError("No drivers available")
driver.available = False
self.assignments[rider_id] = driver.driver_id
return driver.driver_id
def complete_ride(self, rider_id: str) -> None:
driver_id = self.assignments.pop(rider_id)
self.drivers[driver_id].available = True
def cancel_ride(self, rider_id: str) -> None:
driver_id = self.assignments.pop(rider_id, None)
if driver_id:
self.drivers[driver_id].available = True
class ZoneLockManager:
def __init__(self) -> None:
self._locks: Dict[str, threading.RLock] = defaultdict(threading.RLock)
def lock(self, zone: str) -> threading.RLock:
return self._locks[zone]
class SurgeManager:
def __init__(self, base_multiplier: float = 1.0, step: float = 0.2) -> None:
self.base_multiplier = base_multiplier
self.step = step
self.multipliers: Dict[str, float] = defaultdict(lambda: self.base_multiplier)
self.lock = threading.Lock()
def bump(self, zone: str) -> float:
with self.lock:
self.multipliers[zone] += self.step
return round(self.multipliers[zone], 2)
def relax(self, zone: str) -> float:
with self.lock:
self.multipliers[zone] = max(self.base_multiplier, self.multipliers[zone] - self.step)
return round(self.multipliers[zone], 2)
def current(self, zone: str) -> float:
with self.lock:
return round(self.multipliers[zone], 2)
class ConcurrentRideSharingService(RideSharingService):
def __init__(self, strategy: MatchStrategy, zone_locks: ZoneLockManager, surge: SurgeManager):
super().__init__(strategy)
self.zone_locks = zone_locks
self.surge = surge
self.assignment_zones: Dict[str, str] = {}
def request_ride(self, rider_id: str, location: Tuple[float, float], zone: str) -> Tuple[str, float]:
lock = self.zone_locks.lock(zone)
with lock:
driver_id = super().request_ride(rider_id, location)
self.assignment_zones[rider_id] = zone
surge_multiplier = self.surge.bump(zone)
return driver_id, surge_multiplier
def complete_ride(self, rider_id: str) -> None:
zone = self.assignment_zones.pop(rider_id, "default")
super().complete_ride(rider_id)
self.surge.relax(zone)
def cancel_ride(self, rider_id: str) -> None:
zone = self.assignment_zones.pop(rider_id, None)
super().cancel_ride(rider_id)
if zone:
self.surge.relax(zone)
class AvailabilityCache:
def __init__(self) -> None:
self.store: Dict[str, Driver] = {}
self.lock = threading.Lock()
def upsert(self, driver: Driver) -> None:
with self.lock:
self.store[driver.driver_id] = Driver(driver.driver_id, driver.location, driver.available, driver.zone)
def reserve_by_id(self, driver_id: str) -> None:
with self.lock:
if driver_id in self.store:
self.store[driver_id].available = False
def reserve_best(self, zone: str, rider_loc: Tuple[float, float]) -> Optional[Driver]:
with self.lock:
best_id = None
best_distance = float("inf")
rx, ry = rider_loc
for driver in self.store.values():
if driver.zone != zone or not driver.available:
continue
dx, dy = driver.location
dist = math.hypot(rx - dx, ry - dy)
if dist < best_distance:
best_distance, best_id = dist, driver.driver_id
if best_id is None:
return None
driver = self.store[best_id]
driver.available = False
return Driver(driver.driver_id, driver.location, False, driver.zone)
def release(self, driver_id: str) -> None:
with self.lock:
if driver_id in self.store:
self.store[driver_id].available = True
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def allow(self) -> bool:
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = "HALF_OPEN"
return True
return False
return True
def record_failure(self) -> None:
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
self.failure_count = 0
def record_success(self) -> None:
with self.lock:
self.failure_count = 0
self.state = "CLOSED"
class MobilityGateway:
def __init__(self, failure_rate: float = 0.3) -> None:
self.failure_rate = failure_rate
def reserve(self, zone: str) -> None:
if random.random() < self.failure_rate:
raise RuntimeError(f"Mobility gateway timeout for zone {zone}")
class ResilientRideOrchestrator:
def __init__(self,
service: ConcurrentRideSharingService,
primary_cache: AvailabilityCache,
fallback_cache: AvailabilityCache,
breaker: CircuitBreaker,
gateway: MobilityGateway):
self.service = service
self.primary_cache = primary_cache
self.fallback_cache = fallback_cache
self.breaker = breaker
self.gateway = gateway
self.assignments: Dict[str, Tuple[str, str]] = {}
self.metrics_lock = threading.Lock()
self.metrics = {"primary": 0, "fallback": 0, "rejected": 0}
self.dead_letters: Deque[str] = deque()
def register_driver(self, driver: Driver, *, primary: bool = True) -> None:
if primary:
self.service.add_driver(driver)
self.primary_cache.upsert(driver)
self.fallback_cache.upsert(driver)
def request_ride(self, rider_id: str, location: Tuple[float, float], zone: str) -> Tuple[str, float, str]:
if self.breaker.allow():
try:
self.gateway.reserve(zone)
driver_id, surge = self.service.request_ride(rider_id, location, zone)
self.primary_cache.reserve_by_id(driver_id)
self.breaker.record_success()
with self.metrics_lock:
self.metrics["primary"] += 1
self.assignments[rider_id] = ("primary", driver_id, zone)
return driver_id, surge, "primary"
except Exception:
self.breaker.record_failure()
self.service.cancel_ride(rider_id)
fallback_driver = self.fallback_cache.reserve_best(zone, location)
if fallback_driver:
with self.metrics_lock:
self.metrics["fallback"] += 1
self.assignments[rider_id] = ("fallback", fallback_driver.driver_id, zone)
return fallback_driver.driver_id, self.service.surge.current(zone), "fallback"
with self.metrics_lock:
self.metrics["rejected"] += 1
self.dead_letters.append(rider_id)
raise RuntimeError("No drivers available in any pool")
def complete_ride(self, rider_id: str) -> None:
source, driver_id, zone = self.assignments.pop(rider_id)
if source == "primary":
self.service.complete_ride(rider_id)
self.primary_cache.release(driver_id)
else:
self.fallback_cache.release(driver_id)
self.service.surge.relax(zone)
def cancel_ride(self, rider_id: str) -> None:
record = self.assignments.pop(rider_id, None)
if not record:
return
source, driver_id, zone = record
if source == "primary":
self.service.cancel_ride(rider_id)
self.primary_cache.release(driver_id)
else:
self.fallback_cache.release(driver_id)
self.service.surge.relax(zone)
def snapshot(self) -> Dict[str, int]:
with self.metrics_lock:
return dict(self.metrics)
def main() -> None:
random.seed(7)
zone_locks = ZoneLockManager()
surge = SurgeManager()
service = ConcurrentRideSharingService(NearestMatch(), zone_locks, surge)
primary_cache = AvailabilityCache()
fallback_cache = AvailabilityCache()
breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=0.4)
gateway = MobilityGateway(failure_rate=0.35)
orchestrator = ResilientRideOrchestrator(service, primary_cache, fallback_cache, breaker, gateway)
orchestrator.register_driver(Driver("D1", (12.9, 77.6), zone="central"))
orchestrator.register_driver(Driver("D2", (12.91, 77.61), zone="central"))
orchestrator.register_driver(Driver("FD1", (12.88, 77.58), zone="central"), primary=False)
def rider_task(rider_id: str) -> None:
try:
driver_id, surge_multiplier, source = orchestrator.request_ride(rider_id, (12.9, 77.6), "central")
print(f"{rider_id} → {driver_id} via {source} at {surge_multiplier}x")
time.sleep(random.uniform(0.05, 0.15))
orchestrator.complete_ride(rider_id)
except RuntimeError as exc:
print(f"{rider_id} failed: {exc}")
threads = [threading.Thread(target=rider_task, args=(f"R{i}",)) for i in range(1, 7)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Metrics:", orchestrator.snapshot())
if orchestrator.dead_letters:
print("Dead letters:", list(orchestrator.dead_letters))
if __name__ == "__main__":
main()
Progressively build the notification platform from synchronous fan-out to prioritized asynchronous delivery and finally to a resilient multi-provider gateway.
Notification Gateway ├─ Level 1: Publisher → Channel Observers ├─ Level 2: Queue → Worker Pool → Dead Letters └─ Level 3: Provider Bulkheads → Circuit Breakers
Level 1 — Event Fan-out
Focus: observer-driven notification fan-out with strategy-based channels.
from typing import Protocol, List
class ChannelStrategy(Protocol):
def send(self, recipient: str, message: str) -> None:
...
class EmailStrategy:
def send(self, recipient: str, message: str) -> None:
print(f"EMAIL->{recipient}: {message}")
class SmsStrategy:
def send(self, recipient: str, message: str) -> None:
print(f"SMS->{recipient}: {message}")
class NotificationService:
def __init__(self, strategies: List[ChannelStrategy]):
self.strategies = strategies
def notify(self, recipient: str, message: str) -> None:
for strategy in self.strategies:
strategy.send(recipient, message)
if __name__ == "__main__":
service = NotificationService([EmailStrategy(), SmsStrategy()])
service.notify('user@example.com', 'Welcome to the platform!')
Level 2 — Asynchronous Delivery Workers
Extension: layer a worker pool and retrying queue on top of the Level 1 multi-channel service.
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from queue import Queue
from typing import List, Optional, Protocol
class ChannelStrategy(Protocol):
def send(self, recipient: str, message: str) -> None:
...
class EmailStrategy:
def send(self, recipient: str, message: str) -> None:
print(f"EMAIL->{recipient}: {message}")
class SmsStrategy:
def send(self, recipient: str, message: str) -> None:
print(f"SMS->{recipient}: {message}")
class NotificationService:
def __init__(self, strategies: List[ChannelStrategy]):
self.strategies = strategies
def notify(self, recipient: str, message: str) -> None:
for strategy in self.strategies:
strategy.send(recipient, message)
@dataclass
class NotificationJob:
recipient: str
message: str
attempt: int = 0
class AsyncNotificationGateway:
def __init__(self, service: NotificationService, max_workers: int = 3, max_retries: int = 2):
self.service = service
self.max_retries = max_retries
self.queue: Queue[NotificationJob] = Queue()
self.dead_letters: List[NotificationJob] = []
self.running = True
self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(max_workers)]
for worker in self.workers:
worker.start()
def enqueue(self, recipient: str, message: str) -> None:
self.queue.put(NotificationJob(recipient, message))
def _dispatch(self, job: NotificationJob) -> None:
if "FAIL" in job.message:
raise RuntimeError("channel failure")
self.service.notify(job.recipient, job.message)
def _worker(self) -> None:
while self.running:
try:
job = self.queue.get(timeout=0.5)
except Empty:
continue
try:
self._dispatch(job)
except Exception as exc:
job.attempt += 1
if job.attempt <= self.max_retries:
print(f"Retry {job.attempt} for {job.recipient}: {exc}")
time.sleep(0.1 * job.attempt)
self.queue.put(job)
else:
print(f"Dead-lettering {job.recipient}: {job.message}")
self.dead_letters.append(job)
finally:
self.queue.task_done()
def shutdown(self) -> None:
self.running = False
for worker in self.workers:
worker.join(timeout=0.2)
def main() -> None:
base_service = NotificationService([EmailStrategy(), SmsStrategy()])
gateway = AsyncNotificationGateway(base_service, max_workers=2, max_retries=2)
gateway.enqueue("user1", "Welcome!")
gateway.enqueue("user2", "FAIL-SEND")
gateway.enqueue("user3", "Daily digest")
time.sleep(1.5)
gateway.shutdown()
print("Dead letters:", [(job.recipient, job.message) for job in gateway.dead_letters])
if __name__ == "__main__":
main()
Level 3 — Multi-Provider Resiliency
Extension: resilient multi-provider gateway with bulkheads, retries, and circuit breakers.
import threading
import time
import random
from collections import deque
from typing import Callable, Protocol
class Provider(Protocol):
def send(self, recipient: str, message: str) -> None:
...
class PrimaryProvider:
def __init__(self):
self.counter = 0
def send(self, recipient: str, message: str) -> None:
self.counter += 1
if self.counter % 2 == 0:
raise RuntimeError('Primary provider outage')
print(f"PRIMARY->{recipient}: {message}")
class BackupProvider:
def send(self, recipient: str, message: str) -> None:
print(f"BACKUP->{recipient}: {message}")
class Bulkhead:
def __init__(self, provider: Provider, capacity: int):
self.provider = provider
self.semaphore = threading.Semaphore(capacity)
def execute(self, recipient: str, message: str) -> None:
with self.semaphore:
self.provider.send(recipient, message)
class Breaker:
def __init__(self, failure_threshold: int, recovery_time: float):
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.failures = 0
self.last_failure = 0.0
self.state = 'CLOSED'
def invoke(self, func: Callable[[], None]) -> None:
now = time.time()
if self.state == 'OPEN' and now - self.last_failure < self.recovery_time:
raise RuntimeError('Breaker open')
try:
func()
self.failures = 0
self.state = 'CLOSED'
except Exception as exc:
self.failures += 1
self.last_failure = now
if self.failures >= self.failure_threshold:
self.state = 'OPEN'
raise exc
class ResilientGateway:
def __init__(self):
self.primary = Bulkhead(PrimaryProvider(), capacity=2)
self.backup = Bulkhead(BackupProvider(), capacity=4)
self.breaker = Breaker(2, 2.0)
def send(self, recipient: str, message: str) -> None:
try:
self.breaker.invoke(lambda: self.primary.execute(recipient, message))
except Exception:
print('Primary failed, using backup…')
self.backup.execute(recipient, message)
if __name__ == "__main__":
gateway = ResilientGateway()
for idx in range(1, 7):
gateway.send(f'user{idx}', 'Security alert')
time.sleep(0.4)
Progressively enhance the rate limiting system from an in-process token bucket to a distributed sliding window and finally a resilient API gateway that degrades gracefully under failure.
Rate Limiter Stack\n ├─ Level 1: Token Bucket Evaluator\n ├─ Level 2: Distributed Sliding Window + Coordination\n └─ Level 3: Gateway Orchestrator with Circuit Breaker & Fallback
Level 1 — Core Token Bucket
Implement a token bucket rate limiter for controlling API calls. Sample Input: bursts of requests. Sample Output: allow/deny logs.
import threading
import time
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
self.lock = threading.Lock()
def _refill(self) -> None:
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = int(elapsed * self.refill_rate)
if tokens_to_add > 0:
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def try_consume(self) -> bool:
with self.lock:
self._refill()
if self.tokens > 0:
self.tokens -= 1
return True
return False
class RateLimiter:
def __init__(self, bucket: TokenBucket):
self.bucket = bucket
def allow(self) -> bool:
return self.bucket.try_consume()
if __name__ == "__main__":
limiter = RateLimiter(TokenBucket(capacity=5, refill_rate=2))
for _ in range(10):
print("Allowed" if limiter.allow() else "Throttled")
time.sleep(0.3)
Level 2 — Distributed Sliding Window
Create a sliding window rate limiter with shared state protected by locks to simulate distributed instances. Sample Input: parallel request threads. Sample Output: accurate allow/deny with consistent window counts.
from __future__ import annotations
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Deque, Dict
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
self.lock = threading.Lock()
def _refill(self) -> None:
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = int(elapsed * self.refill_rate)
if tokens_to_add > 0:
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def try_consume(self) -> bool:
with self.lock:
self._refill()
if self.tokens > 0:
self.tokens -= 1
return True
return False
class RateLimiter:
def __init__(self, name: str, bucket: TokenBucket):
self.name = name
self.bucket = bucket
def allow(self) -> bool:
return self.bucket.try_consume()
@dataclass
class WindowState:
events: Deque[float]
class SlidingWindowCoordinator:
def __init__(self, window_seconds: float, max_requests: int, replicas: Dict[str, RateLimiter]):
self.window_seconds = window_seconds
self.max_requests = max_requests
self.replicas = replicas
self.window: Dict[str, WindowState] = defaultdict(lambda: WindowState(deque()))
self.window_lock = threading.Lock()
def allow(self, tenant: str) -> bool:
now = time.time()
with self.window_lock:
state = self.window[tenant]
while state.events and now - state.events[0] > self.window_seconds:
state.events.popleft()
if len(state.events) >= self.max_requests:
return False
limiter = self.replicas[tenant]
if not limiter.allow():
return False
with self.window_lock:
state = self.window[tenant]
state.events.append(now)
return True
def release(self, tenant: str) -> None:
limiter = self.replicas[tenant]
with self.window_lock:
state = self.window[tenant]
if state.events:
state.events.pop()
with limiter.bucket.lock:
limiter.bucket.tokens = min(limiter.bucket.capacity, limiter.bucket.tokens + 1)
def main() -> None:
replicas = {
"tenant-a": RateLimiter("tenant-a", TokenBucket(capacity=5, refill_rate=3)),
"tenant-b": RateLimiter("tenant-b", TokenBucket(capacity=3, refill_rate=2)),
}
coordinator = SlidingWindowCoordinator(window_seconds=1.0, max_requests=4, replicas=replicas)
decisions: list[tuple[str, bool]] = []
def fire(tenant: str, idx: int) -> None:
allowed = coordinator.allow(tenant)
decisions.append((f"{tenant}-{idx}", allowed))
threads = [
threading.Thread(target=fire, args=("tenant-a", i))
for i in range(8)
] + [
threading.Thread(target=fire, args=("tenant-b", i))
for i in range(6)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for decision in sorted(decisions):
print(decision)
if __name__ == "__main__":
main()
Level 3 — Resilient Gateway
Combine rate limiting with adaptive throttling and circuit breaker to protect downstream services. Sample Input: downstream service throwing errors. Sample Output: dynamic throttling decisions and breaker state logs.
from __future__ import annotations
import random
import threading
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, Dict, Optional
class TokenBucket:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.tokens = capacity
self.refill_rate = refill_rate
self.last_refill = time.time()
self.lock = threading.Lock()
def _refill(self) -> None:
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = int(elapsed * self.refill_rate)
if tokens_to_add > 0:
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def try_consume(self) -> bool:
with self.lock:
self._refill()
if self.tokens > 0:
self.tokens -= 1
return True
return False
class RateLimiter:
def __init__(self, name: str, bucket: TokenBucket):
self.name = name
self.bucket = bucket
def allow(self) -> bool:
return self.bucket.try_consume()
@dataclass
class WindowState:
events: Deque[float]
class SlidingWindowCoordinator:
def __init__(self, window_seconds: float, max_requests: int, replicas: Dict[str, RateLimiter]):
self.window_seconds = window_seconds
self.max_requests = max_requests
self.replicas = replicas
self.window: Dict[str, WindowState] = defaultdict(lambda: WindowState(deque()))
self.window_lock = threading.Lock()
def allow(self, tenant: str) -> bool:
now = time.time()
with self.window_lock:
state = self.window[tenant]
while state.events and now - state.events[0] > self.window_seconds:
state.events.popleft()
if len(state.events) >= self.max_requests:
return False
limiter = self.replicas[tenant]
if not limiter.allow():
return False
with self.window_lock:
self.window[tenant].events.append(now)
return True
def release(self, tenant: str) -> None:
limiter = self.replicas[tenant]
with self.window_lock:
state = self.window[tenant]
if state.events:
state.events.pop()
with limiter.bucket.lock:
limiter.bucket.tokens = min(limiter.bucket.capacity, limiter.bucket.tokens + 1)
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def call(self, func: Callable[[], str]) -> str:
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("Downstream circuit open")
try:
result = func()
except Exception:
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
self.failure_count = 0
raise
else:
with self.lock:
self.failure_count = 0
self.state = "CLOSED"
return result
class DownstreamService:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def call(self, payload: str) -> str:
if random.random() < self.failure_rate:
raise RuntimeError("backend timeout")
return f"200 OK for {payload}"
class ProtectiveGateway:
def __init__(self, coordinator: SlidingWindowCoordinator, breaker: CircuitBreaker, service: DownstreamService):
self.coordinator = coordinator
self.breaker = breaker
self.service = service
self.metrics_lock = threading.Lock()
self.metrics = {"throttled": 0, "success": 0, "fallback": 0}
def handle_request(self, tenant: str, payload: str) -> str:
if not self.coordinator.allow(tenant):
with self.metrics_lock:
self.metrics["throttled"] += 1
return f"{tenant}/{payload}: THROTTLED"
try:
response = self.breaker.call(lambda: self.service.call(payload))
with self.metrics_lock:
self.metrics["success"] += 1
return f"{tenant}/{payload}: {response}"
except Exception as exc:
self.coordinator.release(tenant)
with self.metrics_lock:
self.metrics["fallback"] += 1
return f"{tenant}/{payload}: FALLBACK ({exc})"
def snapshot(self) -> Dict[str, int]:
with self.metrics_lock:
return dict(self.metrics)
def main() -> None:
random.seed(17)
replicas = {
"tenant-a": RateLimiter("tenant-a", TokenBucket(capacity=6, refill_rate=4)),
"tenant-b": RateLimiter("tenant-b", TokenBucket(capacity=4, refill_rate=3)),
}
coordinator = SlidingWindowCoordinator(window_seconds=1.5, max_requests=5, replicas=replicas)
gateway = ProtectiveGateway(coordinator, CircuitBreaker(failure_threshold=3, recovery_timeout=0.8), DownstreamService(failure_rate=0.45))
for idx in range(12):
tenant = "tenant-a" if idx % 3 else "tenant-b"
result = gateway.handle_request(tenant, f"req-{idx}")
print(result)
time.sleep(0.2)
print("Gateway metrics:", gateway.snapshot())
if __name__ == "__main__":
main()
Incrementally build the URL shortener from a single-node service to a concurrent cache-backed implementation and finally a resilient multi-region deployment.
URL Shortener Stack ├─ Level 1: Encoder Strategy → Repository ├─ Level 2: Thread-Safe Cache + Persistence └─ Level 3: Multi-Region Replication & Resiliency
Level 1 — Core Service
Design a URL shortening service that can create short codes, resolve them, and track hit counts. Sample Input: shorten("https://example.com/docs"), resolve(code). Sample Output: Short code string and 1 hit recorded.
from __future__ import annotations
import hashlib
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, Optional
import threading
@dataclass
class UrlRecord:
short_code: str
long_url: str
created_at: float
hits: int = 0
class CodeEncoder(ABC):
@abstractmethod
def encode(self, long_url: str) -> str:
...
class Md5Encoder(CodeEncoder):
def encode(self, long_url: str) -> str:
digest = hashlib.md5(long_url.encode("utf-8")).hexdigest()
return digest[:7]
class UrlRepository:
def __init__(self) -> None:
self._records: Dict[str, UrlRecord] = {}
self._lock = threading.RLock()
def save(self, record: UrlRecord) -> None:
with self._lock:
self._records[record.short_code] = record
def get(self, short_code: str) -> Optional[UrlRecord]:
with self._lock:
return self._records.get(short_code)
class UrlShortener:
def __init__(self, encoder: CodeEncoder, repository: UrlRepository) -> None:
self.encoder = encoder
self.repository = repository
def shorten(self, long_url: str) -> str:
code = self.encoder.encode(long_url)
if not self.repository.get(code):
self.repository.save(UrlRecord(code, long_url, time.time()))
return code
def resolve(self, code: str) -> str:
record = self.repository.get(code)
if not record:
raise KeyError("Unknown short code")
record.hits += 1
return record.long_url
def stats(self, code: str) -> UrlRecord:
record = self.repository.get(code)
if not record:
raise KeyError("Unknown short code")
return record
def main() -> None:
repo = UrlRepository()
service = UrlShortener(Md5Encoder(), repo)
code = service.shorten("https://example.com/docs")
print("Short code:", code)
print("Redirect to:", service.resolve(code))
print("Stats:", service.stats(code))
if __name__ == "__main__":
main()
Level 2 — Concurrent Cache
Support concurrent requests across multiple threads with a write-through cache and delayed persistence. Sample Input: 4 worker threads shortening and resolving URLs. Sample Output: All operations succeed without race conditions and persisted snapshot updated once.
from __future__ import annotations
import threading
from collections import OrderedDict
from typing import Dict, Optional
import hashlib
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class UrlRecord:
short_code: str
long_url: str
created_at: float
hits: int = 0
class CodeEncoder(ABC):
@abstractmethod
def encode(self, long_url: str) -> str:
...
class Md5Encoder(CodeEncoder):
def encode(self, long_url: str) -> str:
digest = hashlib.md5(long_url.encode("utf-8")).hexdigest()
return digest[:7]
class UrlRepository:
def __init__(self) -> None:
self._records: Dict[str, UrlRecord] = {}
self._lock = threading.RLock()
def save(self, record: UrlRecord) -> None:
with self._lock:
self._records[record.short_code] = record
def get(self, short_code: str) -> Optional[UrlRecord]:
with self._lock:
return self._records.get(short_code)
def snapshot(self) -> Dict[str, UrlRecord]:
with self._lock:
return {code: record for code, record in self._records.items()}
class UrlShortener:
def __init__(self, encoder: CodeEncoder, repository: UrlRepository) -> None:
self.encoder = encoder
self.repository = repository
def shorten(self, long_url: str) -> str:
code = self.encoder.encode(long_url)
if not self.repository.get(code):
self.repository.save(UrlRecord(code, long_url, time.time()))
return code
def resolve(self, code: str) -> str:
record = self.repository.get(code)
if not record:
raise KeyError("Unknown short code")
record.hits += 1
return record.long_url
def stats(self, code: str) -> UrlRecord:
record = self.repository.get(code)
if not record:
raise KeyError("Unknown short code")
return record
class ThreadSafeLRUCache:
def __init__(self, capacity: int) -> None:
self.capacity = capacity
self._data: OrderedDict[str, UrlRecord] = OrderedDict()
self._lock = threading.RLock()
def get(self, key: str) -> Optional[UrlRecord]:
with self._lock:
record = self._data.get(key)
if record:
self._data.move_to_end(key)
return record
def put(self, key: str, record: UrlRecord) -> None:
with self._lock:
self._data[key] = record
self._data.move_to_end(key)
if len(self._data) > self.capacity:
self._data.popitem(last=False)
class CachedUrlShortener(UrlShortener):
def __init__(self, encoder: CodeEncoder, repository: UrlRepository, cache_capacity: int = 256) -> None:
super().__init__(encoder, repository)
self.cache = ThreadSafeLRUCache(cache_capacity)
self._locks: Dict[str, threading.Lock] = {}
self._locks_guard = threading.Lock()
def _lock_for(self, key: str) -> threading.Lock:
with self._locks_guard:
return self._locks.setdefault(key, threading.Lock())
def shorten(self, long_url: str) -> str:
code = super().shorten(long_url)
record = self.repository.get(code)
if record:
self.cache.put(code, record)
return code
def resolve(self, code: str) -> str:
lock = self._lock_for(code)
with lock:
cached = self.cache.get(code)
if cached:
cached.hits += 1
return cached.long_url
record = self.repository.get(code)
if not record:
raise KeyError("Unknown short code")
record.hits += 1
self.cache.put(code, record)
return record.long_url
def worker(shortener: CachedUrlShortener, url: str) -> None:
code = shortener.shorten(url)
resolved = shortener.resolve(code)
print(f"{url} -> {code} -> {resolved}")
def main() -> None:
repo = UrlRepository()
shortener = CachedUrlShortener(Md5Encoder(), repo, cache_capacity=4)
urls = [f"https://service.local/resource/{i}" for i in range(6)]
threads = [threading.Thread(target=worker, args=(shortener, url)) for url in urls]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Repository snapshot:")
for code, record in repo.snapshot().items():
print(code, "->", record.long_url, "hits:", record.hits)
if __name__ == "__main__":
main()
Level 3 — Resilient Multi-Region
Keep the shortener available across regions with circuit breaking around a flaky primary store and asynchronous replication to a fallback. Sample Input: 6 shorten/resolve calls with simulated store failures. Sample Output: Successful responses with logged fallbacks and replay to secondary store.
from __future__ import annotations
import random
import threading
import time
from queue import Queue
from typing import Optional
class UnstableRepository(UrlRepository):
def __init__(self, failure_rate: float = 0.35) -> None:
super().__init__()
self.failure_rate = failure_rate
def _maybe_fail(self) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("primary-region-unavailable")
def save(self, record: UrlRecord) -> None:
self._maybe_fail()
super().save(record)
def get(self, short_code: str) -> Optional[UrlRecord]:
self._maybe_fail()
return super().get(short_code)
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
except Exception:
with self.lock:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
self.failures = 0
raise
else:
with self.lock:
self.failures = 0
self.state = "CLOSED"
return result
class MultiRegionShortener:
def __init__(self, primary_repo: UnstableRepository, secondary_repo: UrlRepository) -> None:
self.primary = CachedUrlShortener(Md5Encoder(), primary_repo, cache_capacity=256)
self.secondary = CachedUrlShortener(Md5Encoder(), secondary_repo, cache_capacity=512)
self.breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=1.0)
self.replay_queue: Queue[UrlRecord] = Queue()
threading.Thread(target=self._replay_loop, daemon=True).start()
def _replay_loop(self) -> None:
while True:
record = self.replay_queue.get()
try:
self.breaker.call(self.primary.repository.clone, record)
print("Replayed to primary:", record.short_code)
except Exception:
time.sleep(0.4)
self.replay_queue.put(record)
def shorten(self, url: str) -> str:
code = self.secondary.shorten(url)
record = self.secondary.repository.get(code)
if not record:
raise RuntimeError("unexpected-missing-record")
try:
self.breaker.call(self.primary.repository.clone, record)
except Exception:
print("Primary write failed, queueing for replay:", code)
self.replay_queue.put(record)
return code
def resolve(self, code: str) -> str:
try:
return self.breaker.call(self.primary.resolve, code)
except Exception as exc:
print("Primary resolve failed:", exc, "- using secondary")
return self.secondary.resolve(code)
def main() -> None:
random.seed(21)
service = MultiRegionShortener(UnstableRepository(0.4), UrlRepository())
urls = [f"https://docs.product/{i}" for i in range(6)]
codes = [service.shorten(url) for url in urls]
for code in codes:
print("Resolve", code, "->", service.resolve(code))
time.sleep(0.2)
if __name__ == "__main__":
main()
Grow the ticketing service from basic seat allocation to concurrent hold handling and finally a resilient saga-based workflow.
Ticketing Stack ├─ Level 1: Seat Selector → Repository ├─ Level 2: Hold Manager + Concurrent Workers └─ Level 3: Saga Orchestrator with Payment Integration
Level 1 — Core Seat Booking
Design a ticket booking service that allocates best available seats while keeping track of reservations. Sample Input: request 2 premium seats for show S1. Sample Output: Seats (P1,P2) booked and marked unavailable.
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional, Optional
import threading
@dataclass
class Seat:
seat_id: str
category: str
is_booked: bool = False
class SeatRepository:
def __init__(self) -> None:
self._shows: Dict[str, Dict[str, Seat]] = {}
self._lock = threading.RLock()
def add_show(self, show_id: str, seats: List[Seat]) -> None:
with self._lock:
self._shows[show_id] = {seat.seat_id: seat for seat in seats}
def list_available(self, show_id: str, category: Optional[str] = None) -> List[Seat]:
with self._lock:
seats = self._shows.get(show_id, {})
return [
seat for seat in seats.values()
if not seat.is_booked and (category is None or seat.category == category)
]
def mark_booked(self, show_id: str, seat_ids: List[str]) -> None:
with self._lock:
seats = self._shows.get(show_id)
if not seats:
raise KeyError(f"Unknown show {show_id}")
for seat_id in seat_ids:
seat = seats.get(seat_id)
if not seat:
raise KeyError(f"Unknown seat {seat_id}")
if seat.is_booked:
raise RuntimeError(f"Seat {seat_id} already booked")
seat.is_booked = True
class SeatSelector(ABC):
@abstractmethod
def select(self, seats: List[Seat], quantity: int) -> List[Seat]:
...
class BestAvailableSelector(SeatSelector):
def select(self, seats: List[Seat], quantity: int) -> List[Seat]:
seats_sorted = sorted(seats, key=lambda seat: seat.seat_id)
return seats_sorted[:quantity]
class BookingService:
def __init__(self, repository: SeatRepository, selector: SeatSelector) -> None:
self.repository = repository
self.selector = selector
def book(self, show_id: str, category: str, quantity: int) -> List[str]:
available = self.repository.list_available(show_id, category)
chosen = self.selector.select(available, quantity)
if len(chosen) < quantity:
raise RuntimeError("Insufficient seats")
seat_ids = [seat.seat_id for seat in chosen]
self.repository.mark_booked(show_id, seat_ids)
return seat_ids
def main() -> None:
repo = SeatRepository()
repo.add_show("S1", [
Seat("P1", "PREMIUM"),
Seat("P2", "PREMIUM"),
Seat("P3", "PREMIUM"),
Seat("G1", "GOLD"),
Seat("S1", "SILVER"),
])
service = BookingService(repo, BestAvailableSelector())
booked = service.book("S1", "PREMIUM", 2)
print("Booked seats:", booked)
print("Remaining premium seats:", [seat.seat_id for seat in repo.list_available("S1", "PREMIUM")])
if __name__ == "__main__":
main()
Level 2 — Concurrent Seat Holds
Handle concurrent seat requests by holding seats atomically and expiring holds to avoid deadlocks. Sample Input: 4 threads booking overlapping seats. Sample Output: Each seat assigned to only one booking, expired holds released.
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class SeatHold:
show_id: str
seat_ids: List[str]
customer_id: str
expires_at: float
class SeatHoldManager:
def __init__(self, repository: SeatRepository, hold_seconds: float = 3.0) -> None:
self.repository = repository
self.hold_seconds = hold_seconds
self._holds: Dict[tuple[str, str], SeatHold] = {}
self._lock = threading.RLock()
self._sweeper = threading.Thread(target=self._sweep_loop, daemon=True)
self._sweeper.start()
def _sweep_loop(self) -> None:
while True:
time.sleep(self.hold_seconds / 2)
self._purge_expired()
def _purge_expired(self) -> None:
now = time.time()
with self._lock:
expired = [key for key, hold in self._holds.items() if hold.expires_at <= now]
for key in expired:
del self._holds[key]
def available(self, show_id: str, category: str) -> List[Seat]:
self._purge_expired()
with self._lock:
held = {
seat_id for (s_id, seat_id), hold in self._holds.items()
if s_id == show_id and hold.expires_at > time.time()
}
return [
seat for seat in self.repository.list_available(show_id, category)
if seat.seat_id not in held
]
def hold(self, show_id: str, seat_id: str, customer_id: str) -> bool:
now = time.time()
expires_at = now + self.hold_seconds
key = (show_id, seat_id)
with self._lock:
self._purge_expired()
existing = self._holds.get(key)
if existing and existing.expires_at > now:
return False
self._holds[key] = SeatHold(show_id, [seat_id], customer_id, expires_at)
return True
def release(self, show_id: str, seat_ids: List[str]) -> None:
with self._lock:
for seat_id in seat_ids:
self._holds.pop((show_id, seat_id), None)
class ConcurrentBookingService:
def __init__(self, repository: SeatRepository, holds: SeatHoldManager, selector: SeatSelector) -> None:
self.repository = repository
self.holds = holds
self.selector = selector
def hold(self, show_id: str, customer_id: str, category: str, quantity: int) -> SeatHold:
acquired: List[str] = []
for seat in self.selector.select(self.holds.available(show_id, category), quantity):
if self.holds.hold(show_id, seat.seat_id, customer_id):
acquired.append(seat.seat_id)
if len(acquired) == quantity:
break
if len(acquired) < quantity:
if acquired:
self.holds.release(show_id, acquired)
raise RuntimeError("Insufficient seats")
return SeatHold(show_id, acquired, customer_id, time.time() + self.holds.hold_seconds)
def confirm(self, hold: SeatHold) -> List[str]:
self.repository.mark_booked(hold.show_id, hold.seat_ids)
self.holds.release(hold.show_id, hold.seat_ids)
return hold.seat_ids
def release(self, hold: SeatHold) -> None:
self.holds.release(hold.show_id, hold.seat_ids)
def book(self, show_id: str, customer_id: str, category: str, quantity: int) -> List[str]:
hold = self.hold(show_id, customer_id, category, quantity)
try:
return self.confirm(hold)
except Exception:
self.release(hold)
raise
def worker(service: ConcurrentBookingService, customer: str, qty: int) -> None:
try:
seats = service.book("S1", customer, "PREMIUM", qty)
print(customer, "got", seats)
except Exception as exc:
print(customer, "failed:", exc)
def main() -> None:
repo = SeatRepository()
repo.add_show("S1", [Seat(f"P{i}", "PREMIUM") for i in range(1, 7)])
holds = SeatHoldManager(repo, hold_seconds=1.5)
service = ConcurrentBookingService(repo, holds, BestAvailableSelector())
threads = [threading.Thread(target=worker, args=(service, f"C{i}", 2)) for i in range(1, 5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
Level 3 — Resilient Booking Saga
Guarantee consistency when dependent services like payment or inventory can fail by orchestrating a saga with compensation and a waitlist fallback. Sample Input: Book 3 orders with intermittent payment failure. Sample Output: Successful bookings committed, failures moved to waitlist and inventory restored.
from __future__ import annotations
import random
import time
import threading
from dataclasses import dataclass, field
from typing import List
class PaymentGateway:
def __init__(self, failure_rate: float = 0.35) -> None:
self.failure_rate = failure_rate
def charge(self, order_id: str, amount: float) -> str:
if random.random() < self.failure_rate:
raise RuntimeError("payment-declined")
return f"rcpt-{order_id}-{int(time.time() * 1000)}"
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("payment-circuit-open")
try:
result = func(*args, **kwargs)
except Exception:
with self.lock:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
self.failures = 0
raise
else:
with self.lock:
self.failures = 0
self.state = "CLOSED"
return result
@dataclass
class OrderResult:
order_id: str
status: str
seats: List[str] = field(default_factory=list)
class Waitlist:
def __init__(self) -> None:
self._orders: List[str] = []
self._lock = threading.Lock()
def add(self, order_id: str) -> None:
with self._lock:
self._orders.append(order_id)
def snapshot(self) -> List[str]:
with self._lock:
return list(self._orders)
class BookingSaga:
def __init__(self,
ticketing: ConcurrentBookingService,
payment: PaymentGateway,
breaker: CircuitBreaker,
waitlist: Waitlist,
max_payment_attempts: int = 3) -> None:
self.ticketing = ticketing
self.payment = payment
self.breaker = breaker
self.waitlist = waitlist
self.max_payment_attempts = max_payment_attempts
def execute(self, order_id: str, show_id: str, category: str, quantity: int, amount: float) -> OrderResult:
try:
hold = self.ticketing.hold(show_id, order_id, category, quantity)
except RuntimeError as exc:
self.waitlist.add(order_id)
return OrderResult(order_id, f"WAITLISTED ({exc})")
try:
attempt = 0
while attempt < self.max_payment_attempts:
try:
receipt = self.breaker.call(self.payment.charge, order_id, amount)
seats = self.ticketing.confirm(hold)
return OrderResult(order_id, f"CONFIRMED receipt={receipt}", seats)
except Exception as exc:
attempt += 1
if attempt >= self.max_payment_attempts:
raise
backoff = 0.2 * attempt
print(f"{order_id}: retrying payment in {backoff:.2f}s ({exc})")
time.sleep(backoff)
except Exception as exc:
self.ticketing.release(hold)
self.waitlist.add(order_id)
return OrderResult(order_id, f"FAILED payment ({exc})")
def main() -> None:
random.seed(10)
repo = SeatRepository()
repo.add_show("S1", [Seat(f"P{i}", "PREMIUM") for i in range(1, 7)])
holds = SeatHoldManager(repo, hold_seconds=2.0)
ticketing = ConcurrentBookingService(repo, holds, BestAvailableSelector())
saga = BookingSaga(
ticketing=ticketing,
payment=PaymentGateway(failure_rate=0.4),
breaker=CircuitBreaker(failure_threshold=2, recovery_timeout=1.0),
waitlist=Waitlist()
)
orders = [
("O1", "S1", "PREMIUM", 2, 120.0),
("O2", "S1", "PREMIUM", 2, 150.0),
("O3", "S1", "PREMIUM", 2, 110.0),
]
for order_id, show_id, category, qty, amount in orders:
result = saga.execute(order_id, show_id, category, qty, amount)
print(result)
print("Waitlist:", saga.waitlist.snapshot())
if __name__ == "__main__":
main()
Progressively enhance the library platform from core circulation to concurrent reservations and finally a resilient catalog tier with caching.
Library System Stack ├─ Level 1: Circulation Service → Repository ├─ Level 2: Reservation Manager with Locks └─ Level 3: Resilient Catalog & Cache
Level 1 — Core Circulation
Implement a library catalog that supports multiple search strategies and book checkouts with due dates. Sample Input: checkout("9780132350884", patronA). Sample Output: Patron A borrowing record created and copies decremented.
from __future__ import annotations
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclasstime, timedelta
from typing import Dict, List, Optional, Optional
@dataclass
class Book:
isbn: str
title: str
author: str
copies: int
class CatalogRepository:
def __init__(self) -> None:
self._books: Dict[str, Book] = {}
self._lock = threading.RLock()
def add(self, book: Book) -> None:
with self._lock:
self._books[book.isbn] = book
def get(self, isbn: str) -> Optional[Book]:
with self._lock:
return self._books.get(isbn)
def all_books(self) -> List[Book]:
with self._lock:
return list(self._books.values())
class SearchStrategy(ABC):
@abstractmethod
def matches(self, book: Book, query: str) -> bool:
...
class TitleSearch(SearchStrategy):
def matches(self, book: Book, query: str) -> bool:
return query.lower() in book.title.lower()
class AuthorSearch(SearchStrategy):
def matches(self, book: Book, query: str) -> bool:
return query.lower() in book.author.lower()
@dataclass
class Loan:
isbn: str
patron: str
due_date: datetime
class LibraryService:
def __init__(self, catalog: CatalogRepository) -> None:
self.catalog = catalog
self.loans: Dict[str, Loan] = {}
def search(self, strategy: SearchStrategy, query: str) -> List[Book]:
return [book for book in self.catalog.all_books() if strategy.matches(book, query)]
def checkout(self, isbn: str, patron: str) -> Loan:
book = self.catalog.get(isbn)
if not book or book.copies == 0:
raise RuntimeError("Unavailable")
book.copies -= 1
loan = Loan(isbn, patron, datetime.utcnow() + timedelta(days=14))
self.loans[f"{isbn}:{patron}"] = loan
return loan
def checkin(self, isbn: str, patron: str) -> None:
key = f"{isbn}:{patron}"
loan = self.loans.pop(key, None)
if loan:
book = self.catalog.get(isbn)
if book:
book.copies += 1
def main() -> None:
catalog = CatalogRepository()
catalog.add(Book("9780132350884", "Clean Code", "Robert C. Martin", 2))
catalog.add(Book("9781617296086", "System Design Interview", "Alex Xu", 1))
service = LibraryService(catalog)
results = service.search(TitleSearch(), "clean")
print("Search results:", results)
loan = service.checkout("9780132350884", "patronA")
print("Loan:", loan)
print("Remaining copies:", catalog.get("9780132350884"))
if __name__ == "__main__":
main()
Level 2 — Concurrent Reservations
Allow concurrent reservation requests with per-book locks and notify waitlisted patrons when copies are returned. Sample Input: 3 threads reserving the same book. Sample Output: Exactly one checkout succeeds, others queued and later notified.
from __future__ import annotations
import threading
import time
from collections import defaultdict, deque
from typing import Callable, Deque, Dict, Optional
class ConcurrentReservationService:
def __init__(self, library: LibraryService, notifier: Callable[[str, str], None]) -> None:
self.library = library
self.notifier = notifier
self._locks: Dict[str, threading.Lock] = defaultdict(threading.Lock)
self._waitlists: Dict[str, Deque[str]] = defaultdict(deque)
def reserve(self, isbn: str, patron: str) -> Optional[Loan]:
lock = self._locks[isbn]
with lock:
available = self.library.catalog.get(isbn)
if available and available.copies > 0:
loan = self.library.checkout(isbn, patron)
print(f"{patron} checked out {isbn}")
return loan
self._waitlists[isbn].append(patron)
print(f"{patron} waitlisted for {isbn}")
return None
def return_copy(self, loan: Loan) -> None:
lock = self._locks[loan.isbn]
next_patron: Optional[str] = None
with lock:
self.library.checkin(loan.isbn, loan.patron)
if self._waitlists[loan.isbn]:
next_patron = self._waitlists[loan.isbn].popleft()
if next_patron:
self.notifier(loan.isbn, next_patron)
def notifier(isbn: str, patron: str) -> None:
print(f"Notify {patron}: {isbn} is available")
def worker(service: ConcurrentReservationService, isbn: str, patron: str) -> None:
loan = service.reserve(isbn, patron)
if loan:
time.sleep(0.2)
service.return_copy(loan)
def main() -> None:
catalog = CatalogRepository()
catalog.add(Book("9780132350884", "Clean Code", "Robert C. Martin", 1))
catalog.add(Book("9781491950357", "Site Reliability Engineering", "Betsy Beyer", 2))
library = LibraryService(catalog)
service = ConcurrentReservationService(library, notifier)
threads = [
threading.Thread(target=worker, args=(service, "9780132350884", f"user{i}"))
for i in range(3)
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
Level 3 — Resilient Catalog Service
Keep catalog queries responsive when the backing datastore is flaky by layering a cache, retries, and stale reads fallback. Sample Input: 5 search queries with simulated primary failures. Sample Output: Cache hits served, retries logged, and stale data used when necessary.
from __future__ import annotations
import random
import threading
import time
from typing import Callable, Dict, List, Optional, Tuple
class FlakyCatalogRepository(CatalogRepository):
def __init__(self, failure_rate: float = 0.3) -> None:
super().__init__()
self.failure_rate = failure_rate
def _maybe_fail(self) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("primary-unavailable")
def add(self, book: Book) -> None:
self._maybe_fail()
super().add(book)
def get(self, isbn: str) -> Optional[Book]:
self._maybe_fail()
return super().get(isbn)
def all_books(self) -> List[Book]:
self._maybe_fail()
return super().all_books()
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure = 0.0
self.state = "CLOSED"
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == "OPEN":
if time.time() - self.last_failure >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
except Exception:
with self.lock:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
self.failures = 0
raise
else:
with self.lock:
self.failures = 0
self.state = "CLOSED"
return result
class ResilientLibraryGateway:
def __init__(self, primary_repo: FlakyCatalogRepository, secondary_repo: CatalogRepository, notifier: Callable[[str, str], None]) -> None:
self.primary_repo = primary_repo
self.secondary_repo = secondary_repo
self.primary_service = LibraryService(self.primary_repo)
self.secondary_service = LibraryService(self.secondary_repo)
self.primary_reservations = ConcurrentReservationService(self.primary_service, notifier)
self.secondary_reservations = ConcurrentReservationService(self.secondary_service, notifier)
self.breaker = CircuitBreaker(failure_threshold=2, recovery_timeout=1.0)
self.cache: Dict[Tuple[str, str], List[Book]] = {}
def add_book(self, book: Book) -> None:
primary_copy = Book(book.isbn, book.title, book.author, book.copies)
secondary_copy = Book(book.isbn, book.title, book.author, book.copies)
try:
self.breaker.call(self.primary_repo.add, primary_copy)
except Exception as exc:
print("Primary add failed:", exc)
self.secondary_repo.add(secondary_copy)
def search(self, strategy: SearchStrategy, query: str) -> List[Book]:
key = (strategy.__class__.__name__, query.lower())
try:
results = self.breaker.call(self.primary_service.search, strategy, query)
self.cache[key] = results
return results
except Exception as exc:
print("Primary search failed:", exc)
if key in self.cache:
print("Serving cached results for", query)
return self.cache[key]
return self.secondary_service.search(strategy, query)
def checkout(self, isbn: str, patron: str, category: str) -> Optional[Loan]:
try:
loan = self.primary_reservations.reserve(isbn, patron)
if loan:
return loan
except Exception as exc:
print("Primary checkout error:", exc)
return self.secondary_reservations.reserve(isbn, patron)
def checkin(self, loan: Loan) -> None:
try:
self.primary_reservations.return_copy(loan)
except Exception as exc:
print("Primary checkin error:", exc)
self.secondary_reservations.return_copy(loan)
def main() -> None:
random.seed(5)
primary_repo = FlakyCatalogRepository(0.4)
secondary_repo = CatalogRepository()
gateway = ResilientLibraryGateway(primary_repo, secondary_repo, notifier)
gateway.add_book(Book("9780132350884", "Clean Code", "Robert C. Martin", 1))
gateway.add_book(Book("9781491950357", "Site Reliability Engineering", "Betsy Beyer", 2))
gateway.add_book(Book("9780134494166", "Clean Architecture", "Robert C. Martin", 1))
for term in ["clean", "reliability", "clean"]:
results = gateway.search(TitleSearch(), term)
print(term, "->", [book.title for book in results])
loans: List[Loan] = []
for patron in ["patronA", "patronB", "patronC"]:
loan = gateway.checkout("9780132350884", patron, "PREMIUM")
if loan:
loans.append(loan)
for loan in loans:
gateway.checkin(loan)
if __name__ == "__main__":
main()
Build the Splitwise-like system progressively from a basic ledger to concurrent settlement handling and resilient reconciliation.
Splitwise Platform ├─ Level 1: Ledger + Expense Aggregation ├─ Level 2: Concurrent Settlement Engine └─ Level 3: Resilient Settlement Coordinator
Level 1 — Core Implementation
Build a Splitwise-style service that records expenses, generates simplified balances, and notifies users. Sample Input: split("outing", 120, ["A","B","C"]). Sample Output: Balances showing who owes whom.
from __future__ import annotations
import threading
from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
@dataclass
class Expense:
title: str
amount: float
participants: List[str]
class BalanceLedger:
def __init__(self) -> None:
self.expenses: List[Expense] = []
self.balances: Dict[str, float] = defaultdict(float)
self._lock = threading.RLock()
def record(self, expense: Expense) -> None:
with self._lock:
share = expense.amount / len(expense.participants)
payer = expense.participants[0]
for user in expense.participants:
if user == payer:
self.balances[user] += expense.amount - share
else:
self.balances[user] -= share
self.expenses.append(expense)
def summary(self) -> Dict[str, float]:
with self._lock:
return dict(self.balances)
def simplify(self) -> List[Tuple[str, str, float]]:
with self._lock:
debtors = [(user, -amount) for user, amount in self.balances.items() if amount < 0]
creditors = [(user, amount) for user, amount in self.balances.items() if amount > 0]
debtors.sort(key=lambda x: x[1], reverse=True)
creditors.sort(key=lambda x: x[1], reverse=True)
settlements: List[Tuple[str, str, float]] = []
i = j = 0
while i < len(debtors) and j < len(creditors):
debtor, debt = debtors[i]
creditor, credit = creditors[j]
pay = min(debt, credit)
settlements.append((debtor, creditor, pay))
debt -= pay
credit -= pay
if debt == 0:
i += 1
else:
debtors[i] = (debtor, debt)
if credit == 0:
j += 1
else:
creditors[j] = (creditor, credit)
return settlements
class Notifier:
def notify(self, message: str) -> None:
print("Notify:", message)
class SplitwiseService:
def __init__(self, ledger: BalanceLedger, notifier: Notifier) -> None:
self.ledger = ledger
self.notifier = notifier
def add_expense(self, title: str, amount: float, participants: List[str]) -> None:
expense = Expense(title, amount, participants)
self.ledger.record(expense)
self.notifier.notify(f"Expense {title} recorded for {participants}")
def balances(self) -> Dict[str, float]:
return self.ledger.summary()
def main() -> None:
service = SplitwiseService(BalanceLedger(), Notifier())
service.add_expense("Dinner", 120.0, ["A", "B", "C"])
service.add_expense("Cab", 60.0, ["B", "A"])
print("Balances:", service.balances())
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Allow concurrent expense postings and settlements while preventing race conditions. Sample Input: 5 threads posting expenses. Sample Output: Ledger remains consistent and settlements executed without conflicts.
from __future__ import annotations
import threading
from typing import Dict, List, Optional, Tuple
class AccountLockManager:
def __init__(self) -> None:
self._locks: Dict[str, threading.Lock] = {}
self._guard = threading.Lock()
def acquire(self, users: List[str]) -> List[threading.Lock]:
ordered = sorted(set(users))
locks: List[threading.Lock] = []
with self._guard:
for user in ordered:
locks.append(self._locks.setdefault(user, threading.Lock()))
for lock in locks:
lock.acquire()
return locks
@staticmethod
def release(locks: List[threading.Lock]) -> None:
for lock in reversed(locks):
lock.release()
class ConcurrentSplitwiseService:
def __init__(self, service: SplitwiseService) -> None:
self.service = service
self.lock_manager = AccountLockManager()
def add_expense(self, title: str, amount: float, participants: List[str]) -> None:
locks = self.lock_manager.acquire(participants)
try:
self.service.add_expense(title, amount, participants)
finally:
AccountLockManager.release(locks)
def settlements(self) -> List[Tuple[str, str, float]]:
return self.service.ledger.simplify()
def worker(service: ConcurrentSplitwiseService, idx: int) -> None:
participants = ["A", "B", "C"]
service.add_expense(f"expense-{idx}", 40.0 + idx * 5, participants)
def main() -> None:
base_service = SplitwiseService(BalanceLedger(), Notifier())
concurrent_service = ConcurrentSplitwiseService(base_service)
threads = [threading.Thread(target=worker, args=(concurrent_service, i)) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Balances:", base_service.balances())
print("Settlements:", concurrent_service.settlements())
if __name__ == "__main__":
main()
Level 3 — Resilient Settlement Coordinator
Coordinate settlements against flaky payment processors by wiring the concurrent service through a circuit breaker and durable outbox. Sample Input: three settlement batches with intermittent failures. Sample Output: Successful transfers or retried commands until success.
from __future__ import annotations
import random
import threading
import time
from dataclasses import dataclass
from queue import Queue
from typing import List, Tuple
@dataclass
class SettlementCommand:
debtor: str
creditor: str
amount: float
class PaymentGateway:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def transfer(self, command: SettlementCommand) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("processor-down")
print(f"Transferred {command.amount:.2f} from {command.debtor} to {command.creditor}")
class CircuitBreaker:
def __init__(self, failure_threshold: int, recovery_timeout: float) -> None:
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.last_failure = 0.0
self.state = "CLOSED"
self._lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self._lock:
if self.state == "OPEN":
if time.time() - self.last_failure >= self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
except Exception:
with self._lock:
self.failures += 1
self.last_failure = time.time()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
self.failures = 0
raise
else:
with self._lock:
self.failures = 0
self.state = "CLOSED"
return result
class ResilientSettlementCoordinator:
def __init__(self, service: ConcurrentSplitwiseService, gateway: PaymentGateway, breaker: CircuitBreaker) -> None:
self.service = service
self.gateway = gateway
self.breaker = breaker
self.outbox: "Queue[SettlementCommand]" = Queue()
def enqueue_settlements(self) -> None:
for debtor, creditor, amount in self.service.settlements():
self.outbox.put(SettlementCommand(debtor, creditor, amount))
def process(self) -> None:
while not self.outbox.empty():
command = self.outbox.get()
attempt = 0
while attempt < 3:
try:
self.breaker.call(self.gateway.transfer, command)
break
except Exception as exc:
attempt += 1
backoff = 0.1 * (2 ** attempt)
print(f"Retrying {command} after {backoff:.2f}s ({exc})")
time.sleep(backoff)
else:
print("Requeue failed command:", command)
self.outbox.put(command)
break
def main() -> None:
random.seed(7)
base_service = SplitwiseService(BalanceLedger(), Notifier())
concurrent_service = ConcurrentSplitwiseService(base_service)
for idx in range(3):
concurrent_service.add_expense(f"group-{idx}", 100 + idx * 20, ["A", "B", "C"])
coordinator = ResilientSettlementCoordinator(
concurrent_service,
PaymentGateway(0.5),
CircuitBreaker(failure_threshold=2, recovery_timeout=0.5),
)
coordinator.enqueue_settlements()
coordinator.process()
if __name__ == "__main__":
main()
Progressively enhance hotel booking from simple inventory control to concurrent allocation management and a resilient saga-based flow.
Hotel Booking Stack ├─ Level 1: Inventory Repository ├─ Level 2: Concurrent Allocation Locks └─ Level 3: Resilient Microservice Saga
Level 1 — Core Implementation
Design a basic hotel booking service that manages room types, availability, and reservations. Sample Input: reserve("DLX", 2 nights). Sample Output: Reservation confirmation and inventory reduced.
from __future__ import annotations
import threading
from dataclasses import dataclass, timedelta
from typing import Dict, List, Optional
@dataclass
class RoomType:
code: str
total_rooms: int
@dataclass
class Reservation:
reservation_id: str
room_type: str
check_in: date
check_out: date
class RoomInventory:
def __init__(self) -> None:
self.availability: Dict[str, int] = {}
self._lock = threading.RLock()
def add_room_type(self, room_type: RoomType) -> None:
with self._lock:
self.availability[room_type.code] = room_type.total_rooms
def reserve(self, room_type: str, rooms: int) -> None:
with self._lock:
available = self.availability.get(room_type, 0)
if available < rooms:
raise RuntimeError("insufficient-inventory")
self.availability[room_type] = available - rooms
def release(self, room_type: str, rooms: int) -> None:
with self._lock:
self.availability[room_type] = self.availability.get(room_type, 0) + rooms
def snapshot(self) -> Dict[str, int]:
with self._lock:
return dict(self.availability)
class ReservationIdFactory:
def __init__(self) -> None:
self.counter = 0
def next_id(self) -> str:
self.counter += 1
return f"R{self.counter:04d}"
class HotelBookingService:
def __init__(self, inventory: RoomInventory, id_factory: ReservationIdFactory) -> None:
self.inventory = inventory
self.id_factory = id_factory
self.reservations: Dict[str, Reservation] = {}
def reserve(self, room_type: str, nights: int, start: date) -> Reservation:
self.inventory.reserve(room_type, 1)
reservation_id = self.id_factory.next_id()
reservation = Reservation(
reservation_id,
room_type,
start,
start + timedelta(days=nights),
)
self.reservations[reservation_id] = reservation
return reservation
def main() -> None:
inventory = RoomInventory()
inventory.add_room_type(RoomType("DLX", 5))
service = HotelBookingService(inventory, ReservationIdFactory())
reservation = service.reserve("DLX", 2, date.today())
print("Reservation:", reservation)
print("Remaining DLX:", inventory.availability["DLX"])
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Handle concurrent booking attempts across multiple room types and dates without overbooking. Sample Input: 6 threads booking the same room type. Sample Output: Only available rooms reserved, others rejected gracefully.
from __future__ import annotations
import threading
from collections import defaultdict
from dataclasses import dataclass
from datetime import date, timedelta
from typing import Dict, List, Optional
class AvailabilityCalendar:
def __init__(self, room_type: str, total_rooms: int) -> None:
self.room_type = room_type
self.total_rooms = total_rooms
self._locks: Dict[date, threading.Lock] = defaultdict(threading.Lock)
self._availability: Dict[date, int] = defaultdict(lambda: total_rooms)
def try_reserve(self, start: date, nights: int) -> bool:
dates = [start + timedelta(days=i) for i in range(nights)]
locks = [self._locks[d] for d in dates]
for lock in locks:
lock.acquire()
try:
if any(self._availability[d] <= 0 for d in dates):
return False
for d in dates:
self._availability[d] -= 1
return True
finally:
for lock in reversed(locks):
lock.release()
def release(self, start: date, nights: int) -> None:
dates = [start + timedelta(days=i) for i in range(nights)]
locks = [self._locks[d] for d in dates]
for lock in locks:
lock.acquire()
try:
for d in dates:
self._availability[d] += 1
finally:
for lock in reversed(locks):
lock.release()
class ConcurrentHotelBookingService:
def __init__(self, service: HotelBookingService, calendar: AvailabilityCalendar) -> None:
self.service = service
self.calendar = calendar
def reserve(self, guest: str, room_type: str, nights: int, start: date) -> Optional[Reservation]:
if not self.calendar.try_reserve(start, nights):
print(f"{guest} reservation failed")
return None
reservation = self.service.reserve(room_type, nights, start)
print(f"{guest} reservation succeeded: {reservation.reservation_id}")
return reservation
def release(self, reservation: Reservation) -> None:
nights = (reservation.check_out - reservation.check_in).days
self.calendar.release(reservation.check_in, nights)
self.service.inventory.release(reservation.room_type, 1)
self.service.reservations.pop(reservation.reservation_id, None)
def worker(service: ConcurrentHotelBookingService, guest: str) -> None:
service.reserve(guest, "DLX", 2, date.today())
def main() -> None:
inventory = RoomInventory()
inventory.add_room_type(RoomType("DLX", 3))
booking_service = HotelBookingService(inventory, ReservationIdFactory())
calendar = AvailabilityCalendar("DLX", total_rooms=3)
concurrent_service = ConcurrentHotelBookingService(booking_service, calendar)
threads = [threading.Thread(target=worker, args=(concurrent_service, f"G{i}")) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Keep the booking flow operating even when pricing or payment subsystems fail by orchestrating a saga with fallback rates and compensation steps. Sample Input: 4 reservations with flaky pricing and payment. Sample Output: Successful reservations or graceful fallback with compensation logs.
from __future__ import annotations
import random
import time
from dataclasses import dataclass
from datetime import date
from typing import List, Optional
@dataclass
class BookingCommand:
guest: str
room_type: str
nights: int
class PricingService:
def __init__(self, failure_rate: float = 0.3) -> None:
self.failure_rate = failure_rate
def quote(self, room_type: str, nights: int) -> float:
if random.random() < self.failure_rate:
raise RuntimeError("pricing-down")
base = 120 if room_type == "DLX" else 90
return base * nights
class PaymentService:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def charge(self, guest: str, amount: float) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("payment-failed")
print(f"Charged {guest} amount {amount}")
class SimpleCircuitBreaker:
def __init__(self, threshold: int, cool_down: float) -> None:
self.threshold = threshold
self.cool_down = cool_down
self.failures = 0
self.open_until = 0.0
def call(self, func, *args, **kwargs):
now = time.time()
if now < self.open_until:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
self.failures = 0
return result
except Exception as exc:
self.failures += 1
if self.failures >= self.threshold:
self.open_until = now + self.cool_down
self.failures = 0
raise exc
class BookingSagaCoordinator:
def __init__(self, booking: ConcurrentHotelBookingService, pricing: PricingService, payment: PaymentService) -> None:
self.booking = booking
self.pricing = pricing
self.payment = payment
self.breaker = SimpleCircuitBreaker(threshold=2, cool_down=1.0)
def execute(self, command: BookingCommand) -> None:
try:
price = self.breaker.call(self.pricing.quote, command.room_type, command.nights)
except Exception:
price = 100.0 * command.nights
print("Pricing fallback for", command.guest)
reservation = self.booking.reserve(command.guest, command.room_type, command.nights, date.today())
if not reservation:
print("Reservation failed for", command.guest)
return
try:
self.payment.charge(command.guest, price)
print(f"Reservation {reservation.reservation_id} confirmed for {command.guest}")
except Exception as exc:
print(f"Payment failed for {command.guest}: {exc}")
self.booking.release(reservation)
def main() -> None:
random.seed(9)
inventory = RoomInventory()
inventory.add_room_type(RoomType("DLX", 3))
service = HotelBookingService(inventory, ReservationIdFactory())
calendar = AvailabilityCalendar("DLX", total_rooms=3)
concurrent_service = ConcurrentHotelBookingService(service, calendar)
saga = BookingSagaCoordinator(concurrent_service, PricingService(0.4), PaymentService(0.5))
commands = [
BookingCommand("Alice", "DLX", 2),
BookingCommand("Bob", "DLX", 3),
BookingCommand("Carol", "DLX", 1),
BookingCommand("Dave", "DLX", 2),
]
for command in commands:
saga.execute(command)
if __name__ == "__main__":
main()
Grow the movie ticketing service from layout modeling to seat hold concurrency and resilient payment orchestration.
Movie Ticketing Stack ├─ Level 1: Theatre Layout Builder ├─ Level 2: Concurrent Hold/Release └─ Level 3: Resilient Payment Orchestration
Level 1 — Core Implementation
Model a movie theatre with seat categories and enable seat holds before payment. Sample Input: hold 3 seats in row A. Sample Output: Seats marked on hold and available seats reduced.
from __future__ import annotations
from dataclasses import dataclass
import threading
from typing import Dict, List, Optional, Tuple
@dataclass
class SeatCell:
row: str
number: int
category: str
status: str = "AVAILABLE"
class TheatreLayout:
def __init__(self) -> None:
self.grid: Dict[str, Dict[int, SeatCell]] = {}
self._lock = threading.RLock()
def add_row(self, row: str, count: int, category: str) -> None:
with self._lock:
self.grid[row] = {col: SeatCell(row, col, category) for col in range(1, count + 1)}
def available(self, category: str) -> List[SeatCell]:
with self._lock:
return [
seat for row in self.grid.values() for seat in row.values()
if seat.category == category and seat.status == "AVAILABLE"
]
def mark(self, seats: List[Tuple[str, int]], status: str) -> None:
with self._lock:
for row, number in seats:
self.grid[row][number].status = status
class SeatHoldService:
def __init__(self, layout: TheatreLayout) -> None:
self.layout = layout
def hold(self, category: str, quantity: int) -> List[Tuple[str, int]]:
seats = self.layout.available(category)[:quantity]
if len(seats) < quantity:
raise RuntimeError("not enough seats")
chosen = [(seat.row, seat.number) for seat in seats]
self.layout.mark(chosen, "HOLD")
return chosen
def release(self, seats: List[Tuple[str, int]]) -> None:
self.layout.mark(seats, "AVAILABLE")
def confirm(self, seats: List[Tuple[str, int]]) -> None:
self.layout.mark(seats, "BOOKED")
def main() -> None:
layout = TheatreLayout()
layout.add_row("A", 10, "PREMIUM")
layout.add_row("B", 10, "PREMIUM")
layout.add_row("C", 10, "REGULAR")
hold_service = SeatHoldService(layout)
held = hold_service.hold("PREMIUM", 3)
print("Held seats:", held)
print("Remaining premium:", len(layout.available("PREMIUM")))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Handle concurrent seat hold requests using timed expiry and conditions to wake waiting customers. Sample Input: 5 threads holding overlapping seats. Sample Output: Holds granted or rejected without conflicts, expired holds released.
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
@dataclass
class SeatHoldTicket:
hold_id: str
seats: List[Tuple[str, int]]
customer: str
expires_at: float
class ConcurrentSeatHoldService:
def __init__(self, hold_service: SeatHoldService, ttl: float = 1.5) -> None:
self.hold_service = hold_service
self.ttl = ttl
self._holds: Dict[str, SeatHoldTicket] = {}
self._lock = threading.RLock()
self._counter = 0
threading.Thread(target=self._reaper, daemon=True).start()
def _next_id(self) -> str:
with self._lock:
self._counter += 1
return f"H{self._counter:04d}"
def _purge_expired(self) -> None:
now = time.time()
expired = [hold_id for hold_id, ticket in self._holds.items() if ticket.expires_at <= now]
for hold_id in expired:
ticket = self._holds.pop(hold_id)
self.hold_service.release(ticket.seats)
print("Expired hold released:", hold_id)
def _reaper(self) -> None:
while True:
time.sleep(self.ttl / 2)
with self._lock:
self._purge_expired()
def hold(self, category: str, quantity: int, customer: str) -> Optional[SeatHoldTicket]:
with self._lock:
self._purge_expired()
try:
seats = self.hold_service.hold(category, quantity)
except RuntimeError:
return None
ticket = SeatHoldTicket(self._next_id(), seats, customer, time.time() + self.ttl)
self._holds[ticket.hold_id] = ticket
return ticket
def release(self, hold_id: str) -> None:
with self._lock:
ticket = self._holds.pop(hold_id, None)
if ticket:
self.hold_service.release(ticket.seats)
def confirm(self, hold_id: str) -> Optional[List[Tuple[str, int]]]:
with self._lock:
ticket = self._holds.pop(hold_id, None)
if not ticket:
return None
self.hold_service.confirm(ticket.seats)
return ticket.seats
def worker(service: ConcurrentSeatHoldService, customer: str) -> None:
ticket = service.hold("PREMIUM", 1, customer)
print(customer, "hold ->", bool(ticket))
time.sleep(0.5)
if ticket:
service.release(ticket.hold_id)
def main() -> None:
layout = TheatreLayout()
layout.add_row("A", 3, "PREMIUM")
hold_service = SeatHoldService(layout)
concurrent_service = ConcurrentSeatHoldService(hold_service, ttl=0.8)
threads = [threading.Thread(target=worker, args=(concurrent_service, f"C{i}")) for i in range(4)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Complete ticket booking with payment retries and compensation to release seats if payment fails. Sample Input: 4 bookings with flaky payment gateway. Sample Output: Successful transactions confirmed; failures trigger seat release and waitlist placement.
from __future__ import annotations
import random
import threading
import time
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class TicketOrder:
customer: str
category: str
quantity: int
class TicketPricingService:
def __init__(self, failure_rate: float = 0.3) -> None:
self.failure_rate = failure_rate
def quote(self, category: str, quantity: int) -> float:
if random.random() < self.failure_rate:
raise RuntimeError("pricing-down")
base = 150 if category == "PREMIUM" else 90
return base * quantity
class TicketPaymentService:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def charge(self, customer: str, amount: float) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("payment-failed")
print(f"Charged {customer} amount {amount}")
class TicketCircuitBreaker:
def __init__(self, threshold: int, cool_down: float) -> None:
self.threshold = threshold
self.cool_down = cool_down
self.failures = 0
self.open_until = 0.0
self.lock = threading.Lock()
def call(self, func, *args, **kwargs):
with self.lock:
now = time.time()
if now < self.open_until:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
except Exception:
with self.lock:
self.failures += 1
if self.failures >= self.threshold:
self.open_until = time.time() + self.cool_down
self.failures = 0
raise
else:
with self.lock:
self.failures = 0
self.open_until = 0.0
return result
class TicketingSaga:
def __init__(self,
hold_service: ConcurrentSeatHoldService,
pricing: TicketPricingService,
payment: TicketPaymentService,
breaker: TicketCircuitBreaker) -> None:
self.hold_service = hold_service
self.pricing = pricing
self.payment = payment
self.breaker = breaker
def execute(self, order: TicketOrder) -> None:
ticket = self.hold_service.hold(order.category, order.quantity, order.customer)
if not ticket:
print(order.customer, "could not obtain hold")
return
try:
amount = self.breaker.call(self.pricing.quote, order.category, order.quantity)
except Exception:
amount = 100.0 * order.quantity
print("Pricing fallback used for", order.customer)
try:
self.payment.charge(order.customer, amount)
seats = self.hold_service.confirm(ticket.hold_id)
print(f"Order confirmed for {order.customer}: {seats}")
except Exception as exc:
print(f"Payment failed for {order.customer}: {exc}")
self.hold_service.release(ticket.hold_id)
def main() -> None:
random.seed(11)
layout = TheatreLayout()
layout.add_row("A", 4, "PREMIUM")
layout.add_row("B", 4, "REGULAR")
base_hold_service = SeatHoldService(layout)
concurrent_service = ConcurrentSeatHoldService(base_hold_service, ttl=1.0)
saga = TicketingSaga(
hold_service=concurrent_service,
pricing=TicketPricingService(0.4),
payment=TicketPaymentService(0.5),
breaker=TicketCircuitBreaker(2, 0.8),
)
orders = [
TicketOrder("Alice", "PREMIUM", 2),
TicketOrder("Bob", "PREMIUM", 2),
TicketOrder("Carol", "REGULAR", 2),
TicketOrder("Dave", "PREMIUM", 1),
]
for order in orders:
saga.execute(order)
if __name__ == "__main__":
main()
Enhance the caching service from a basic LRU map to a concurrent implementation and finally a resilient multi-tier architecture.
Cache Hierarchy ├─ Level 1: LRU Map ├─ Level 2: Concurrent LRU └─ Level 3: Resilient Multi-Tier Cache
Level 1 — Core Implementation
Implement an LRU cache supporting O(1) get and put operations with eviction of least recently used keys. Sample Input: put(1,"A"), put(2,"B"), get(1), put(3,"C"). Sample Output: Key 2 evicted, cache contains keys 1 and 3.
from collections import OrderedDict
import threading
from typing import Optional
class LRUCache:
def __init__(self, capacity: int):
self.capacity = capacity
self.store: OrderedDict[int, str] = OrderedDict()
self.lock = threading.RLock()
def get(self, key: int) -> Optional[str]:
with self.lock:
if key not in self.store:
return None
value = self.store.pop(key)
self.store[key] = value
return value
def put(self, key: int, value: str) -> None:
with self.lock:
if key in self.store:
self.store.pop(key)
self.store[key] = value
if len(self.store) > self.capacity:
self.store.popitem(last=False)
def items(self):
with self.lock:
return list(self.store.items())
def main() -> None:
cache = LRUCache(2)
cache.put(1, "A")
cache.put(2, "B")
print("Get 1:", cache.get(1))
cache.put(3, "C")
print("Cache items:", cache.items())
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Extend the LRU cache to support concurrent reads and writes by segmenting the cache with locks. Sample Input: 6 threads performing mixed operations. Sample Output: Cache remains consistent with correct LRU behavior.
from __future__ import annotations
import threading
from typing import Dict
class SegmentedLRUCache:
def __init__(self, shard_count: int, capacity_per_shard: int) -> None:
self.shards: Dict[int, LRUCache] = {
shard: LRUCache(capacity_per_shard)
for shard in range(shard_count)
}
def _shard(self, key: int) -> LRUCache:
return self.shards[key % len(self.shards)]
def get(self, key: int):
return self._shard(key).get(key)
def put(self, key: int, value: str) -> None:
self._shard(key).put(key, value)
def snapshot(self):
return {idx: cache.items() for idx, cache in self.shards.items()}
def worker(cache: SegmentedLRUCache, idx: int) -> None:
cache.put(idx, f"V{idx}")
cache.get(idx)
def main() -> None:
cache = SegmentedLRUCache(2, 2)
threads = [threading.Thread(target=worker, args=(cache, i)) for i in range(6)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Snapshot:", cache.snapshot())
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Implement a two-tier cache with write-through to a flaky datastore and fallback to stale data when the datastore is unavailable. Sample Input: Sequence of get/put with datastore failures. Sample Output: Cache serves data, logs fallbacks, and replays writes.
from __future__ import annotations
import random
import time
from typing import Dict, Optional
class FlakyDatastore:
def __init__(self, failure_rate: float = 0.3) -> None:
self.failure_rate = failure_rate
self.store: Dict[int, str] = {}
def get(self, key: int) -> Optional[str]:
if random.random() < self.failure_rate:
raise RuntimeError("datastore-down")
return self.store.get(key)
def put(self, key: int, value: str) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("datastore-down")
self.store[key] = value
class CircuitBreaker:
def __init__(self, threshold: int, cool_down: float) -> None:
self.threshold = threshold
self.cool_down = cool_down
self.failures = 0
self.open_until = 0.0
def call(self, func, *args, **kwargs):
now = time.time()
if now < self.open_until:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
self.failures = 0
return result
except Exception as exc:
self.failures += 1
if self.failures >= self.threshold:
self.open_until = now + self.cool_down
self.failures = 0
raise exc
class TieredCache:
def __init__(self, datastore: FlakyDatastore, l1_capacity: int = 2, l2_capacity: int = 4) -> None:
self.l1 = LRUCache(l1_capacity)
self.l2 = LRUCache(l2_capacity)
self.datastore = datastore
self.breaker = CircuitBreaker(2, 1.0)
def get(self, key: int) -> Optional[str]:
value = self.l1.get(key)
if value is not None:
return value
value = self.l2.get(key)
if value is not None:
self.l1.put(key, value)
return value
try:
value = self.breaker.call(self.datastore.get, key)
if value is not None:
self.l2.put(key, value)
self.l1.put(key, value)
return value
except Exception as exc:
print("Serving stale due to", exc)
return self.l2.get(key)
def put(self, key: int, value: str) -> None:
self.l1.put(key, value)
self.l2.put(key, value)
try:
self.breaker.call(self.datastore.put, key, value)
except Exception as exc:
print("Write deferred due to", exc)
def main() -> None:
random.seed(8)
cache = TieredCache(FlakyDatastore(0.5))
cache.put(1, "A")
cache.put(2, "B")
print("Get 1:", cache.get(1))
print("Get 2:", cache.get(2))
print("Get 3:", cache.get(3))
if __name__ == "__main__":
main()
Progressively enhance the elevator system from basic control to coordinated multi-elevator scheduling and resilient health-aware dispatch.
Elevator Control Stack ├─ Level 1: Basic Controller ├─ Level 2: Multi-Elevator Dispatch └─ Level 3: Resilient Dispatch with Health Checks
Level 1 — Eventful Single Cab
Build a single-elevator controller that applies strategy + state machine patterns and emits lifecycle events through an EventBus.
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional
@dataclass
class CallRequest:
floor: int
direction: str = "IDLE"
class EventBus:
def __init__(self) -> None:
self._subscribers: Dict[str, List[Callable[[dict], None]]] = {}
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers.setdefault(event, []).append(handler)
def publish(self, event: str, payload: dict) -> None:
for handler in self._subscribers.get(event, []):
handler(payload)
class StateMachine:
def __init__(self, initial_state: str):
self.state = initial_state
self._transitions: Dict[str, set[str]] = {}
def add_transition(self, origin: str, target: str) -> None:
self._transitions.setdefault(origin, set()).add(target)
def transition(self, target: str) -> None:
allowed = self._transitions.get(self.state, set())
if target != self.state and target not in allowed:
raise RuntimeError(f"Invalid transition {self.state} → {target}")
self.state = target
class ElevatorCab(StateMachine):
def __init__(self, cab_id: str, max_floor: int, bus: EventBus):
super().__init__("IDLE")
self.cab_id = cab_id
self.max_floor = max_floor
self.current_floor = 1
self.queue: List[int] = []
self.bus = bus
self.add_transition("IDLE", "MOVING")
self.add_transition("MOVING", "DOORS_OPEN")
self.add_transition("DOORS_OPEN", "IDLE")
def enqueue(self, floor: int) -> None:
if not 1 <= floor <= self.max_floor:
raise ValueError(f"Floor {floor} outside bounds")
if floor not in self.queue:
self.queue.append(floor)
self.bus.publish("cab.queue", {"cab": self.cab_id, "queue": list(self.queue)})
def step(self) -> None:
if not self.queue:
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
return
target = self.queue[0]
if self.current_floor < target:
self.current_floor += 1
self.transition("MOVING")
elif self.current_floor > target:
self.current_floor -= 1
self.transition("MOVING")
else:
self.queue.pop(0)
self.transition("DOORS_OPEN")
self.bus.publish("cab.arrived", {"cab": self.cab_id, "floor": self.current_floor})
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
class DispatchStrategy:
def choose(self, cab: ElevatorCab, request: CallRequest) -> ElevatorCab:
return cab
class ElevatorController:
def __init__(self, cab: ElevatorCab, strategy: DispatchStrategy, bus: EventBus):
self.cab = cab
self.strategy = strategy
self.bus = bus
self.bus.subscribe("call.requested", self._handle_request)
def request(self, floor: int, direction: str = "IDLE") -> None:
self.bus.publish("call.requested", {"request": CallRequest(floor, direction)})
def _handle_request(self, payload: dict) -> None:
request: CallRequest = payload["request"]
cab = self.strategy.choose(self.cab, request)
cab.enqueue(request.floor)
self.bus.publish("call.assigned", {"cab": cab.cab_id, "floor": request.floor})
def tick(self) -> None:
self.cab.step()
def main() -> None:
bus = EventBus()
bus.subscribe("cab.status", lambda event: print("[status]", event))
bus.subscribe("cab.arrived", lambda event: print("[arrived]", event))
cab = ElevatorCab("CAR-1", max_floor=12, bus=bus)
controller = ElevatorController(cab, DispatchStrategy(), bus)
controller.request(7, "UP")
controller.request(3, "DOWN")
for _ in range(8):
controller.tick()
if __name__ == "__main__":
main()
Level 2 — Fleet Dispatch Bus
Coordinate a multi-car fleet via an EventBus, scheduling with command + strategy patterns and projecting assignments without threads.
from __future__ import annotations
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Protocol, Tuple
@dataclass
class CallRequest:
floor: int
direction: str
class EventBus:
def __init__(self) -> None:
self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
self._queue: Deque[Tuple[str, dict]] = deque()
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers[event].append(handler)
def publish(self, event: str, payload: dict) -> None:
self._queue.append((event, payload))
def pump(self) -> None:
while self._queue:
event, payload = self._queue.popleft()
for handler in list(self._subscribers.get(event, [])):
handler(payload)
class StateMachine:
def __init__(self, initial_state: str):
self.state = initial_state
self._transitions: Dict[str, set[str]] = {}
def add_transition(self, origin: str, target: str) -> None:
self._transitions.setdefault(origin, set()).add(target)
def transition(self, target: str) -> None:
allowed = self._transitions.get(self.state, set())
if target != self.state and target not in allowed:
raise RuntimeError(f"Invalid transition {self.state} → {target}")
self.state = target
class ElevatorCab(StateMachine):
def __init__(self, cab_id: str, max_floor: int, bus: EventBus):
super().__init__("IDLE")
self.cab_id = cab_id
self.max_floor = max_floor
self.current_floor = 1
self.queue: List[int] = []
self.bus = bus
self.add_transition("IDLE", "MOVING")
self.add_transition("MOVING", "DOORS_OPEN")
self.add_transition("DOORS_OPEN", "IDLE")
def enqueue(self, floor: int) -> None:
if not 1 <= floor <= self.max_floor:
raise ValueError(f"Floor {floor} outside bounds")
if floor not in self.queue:
self.queue.append(floor)
self.bus.publish("cab.queue", {"cab": self.cab_id, "queue": list(self.queue)})
def step(self) -> None:
if not self.queue:
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
return
target = self.queue[0]
if self.current_floor < target:
self.current_floor += 1
self.transition("MOVING")
elif self.current_floor > target:
self.current_floor -= 1
self.transition("MOVING")
else:
self.queue.pop(0)
self.transition("DOORS_OPEN")
self.bus.publish("cab.arrived", {"cab": self.cab_id, "floor": self.current_floor})
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
class DispatchStrategy:
def choose(self, request: CallRequest, cabs: Iterable[ElevatorCab]) -> ElevatorCab:
def score(cab: ElevatorCab) -> Tuple[int, int, int, str]:
distance = abs(cab.current_floor - request.floor)
load = len(cab.queue)
direction_penalty = 0 if (request.direction == "UP" and cab.current_floor <= request.floor) or (
request.direction == "DOWN" and cab.current_floor >= request.floor
) else 1
return (distance, load, direction_penalty, cab.cab_id)
return min(cabs, key=score)
class Command(Protocol):
def execute(self) -> None:
...
class RequestElevatorCommand:
def __init__(self, bus: EventBus, request: CallRequest):
self.bus = bus
self.request = request
def execute(self) -> None:
self.bus.publish("call.received", {"request": self.request})
class ElevatorFleetController:
def __init__(self, cabs: List[ElevatorCab], strategy: DispatchStrategy, bus: EventBus):
self.cabs = {cab.cab_id: cab for cab in cabs}
self.strategy = strategy
self.bus = bus
self.bus.subscribe("call.received", self._assign_call)
def request(self, floor: int, direction: str) -> Command:
return RequestElevatorCommand(self.bus, CallRequest(floor, direction))
def _assign_call(self, payload: dict) -> None:
request: CallRequest = payload["request"]
cab = self.strategy.choose(request, self.cabs.values())
cab.enqueue(request.floor)
self.bus.publish("call.assigned", {"cab": cab.cab_id, "floor": request.floor, "direction": request.direction})
def step_all(self) -> None:
for cab in self.cabs.values():
cab.step()
class AssignmentBoard:
def __init__(self, bus: EventBus) -> None:
self.assignments: Dict[str, List[int]] = defaultdict(list)
self.served: List[Tuple[str, int]] = []
bus.subscribe("call.assigned", self._on_assigned)
bus.subscribe("cab.arrived", self._on_arrived)
def _on_assigned(self, payload: dict) -> None:
cab = payload["cab"]
floor = payload["floor"]
if floor not in self.assignments[cab]:
self.assignments[cab].append(floor)
def _on_arrived(self, payload: dict) -> None:
cab = payload["cab"]
floor = payload["floor"]
self.served.append((cab, floor))
if floor in self.assignments.get(cab, []):
self.assignments[cab].remove(floor)
def main() -> None:
bus = EventBus()
cabs = [
ElevatorCab("CAR-A", max_floor=20, bus=bus),
ElevatorCab("CAR-B", max_floor=20, bus=bus),
ElevatorCab("CAR-C", max_floor=20, bus=bus),
]
controller = ElevatorFleetController(cabs, DispatchStrategy(), bus)
board = AssignmentBoard(bus)
for floor, direction in [(7, "UP"), (3, "DOWN"), (14, "DOWN"), (9, "UP"), (2, "DOWN")]:
controller.request(floor, direction).execute()
bus.pump()
for _ in range(12):
controller.step_all()
bus.pump()
print("Outstanding assignments:", board.assignments)
print("Served stops:", board.served)
if __name__ == "__main__":
main()
Level 3 — Resilient Fleet Orchestrator
Layer health monitoring, repository-backed failover, and saga-driven requeueing over the event-driven fleet without threads.
from __future__ import annotations
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Callable, Deque, DefaultDict, Dict, Iterable, List, Protocol, Tuple
@dataclass
class CallRequest:
floor: int
direction: str = "IDLE"
class EventBus:
def __init__(self) -> None:
self._subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
self._queue: Deque[Tuple[str, dict]] = deque()
def subscribe(self, event: str, handler: Callable[[dict], None]) -> None:
self._subscribers[event].append(handler)
def publish(self, event: str, payload: dict) -> None:
self._queue.append((event, payload))
def pump(self) -> None:
while self._queue:
event, payload = self._queue.popleft()
for handler in list(self._subscribers.get(event, [])):
handler(payload)
class StateMachine:
def __init__(self, initial_state: str):
self.state = initial_state
self._transitions: Dict[str, set[str]] = {}
def add_transition(self, origin: str, target: str) -> None:
self._transitions.setdefault(origin, set()).add(target)
def transition(self, target: str) -> None:
allowed = self._transitions.get(self.state, set())
if target != self.state and target not in allowed:
raise RuntimeError(f"Invalid transition {self.state} → {target}")
self.state = target
class ElevatorCab(StateMachine):
def __init__(self, cab_id: str, max_floor: int, bus: EventBus):
super().__init__("IDLE")
self.cab_id = cab_id
self.max_floor = max_floor
self.current_floor = 1
self.queue: List[int] = []
self.bus = bus
self.add_transition("IDLE", "MOVING")
self.add_transition("MOVING", "DOORS_OPEN")
self.add_transition("DOORS_OPEN", "IDLE")
def enqueue(self, floor: int) -> None:
if not 1 <= floor <= self.max_floor:
raise ValueError(f"Floor {floor} outside bounds")
if floor not in self.queue:
self.queue.append(floor)
self.bus.publish("cab.queue", {"cab": self.cab_id, "queue": list(self.queue)})
def step(self) -> None:
if not self.queue:
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
return
target = self.queue[0]
if self.current_floor < target:
self.current_floor += 1
self.transition("MOVING")
elif self.current_floor > target:
self.current_floor -= 1
self.transition("MOVING")
else:
self.queue.pop(0)
self.transition("DOORS_OPEN")
self.bus.publish("cab.arrived", {"cab": self.cab_id, "floor": self.current_floor})
self.transition("IDLE")
self.bus.publish("cab.status", {"cab": self.cab_id, "state": self.state, "floor": self.current_floor})
class DispatchStrategy:
def choose(self, request: CallRequest, cabs: Iterable[ElevatorCab]) -> ElevatorCab:
def score(cab: ElevatorCab) -> Tuple[int, int, int, str]:
distance = abs(cab.current_floor - request.floor)
load = len(cab.queue)
direction_penalty = 0 if (request.direction == "UP" and cab.current_floor <= request.floor) or (
request.direction == "DOWN" and cab.current_floor >= request.floor
) else 1
return (distance, load, direction_penalty, cab.cab_id)
return min(cabs, key=score)
class Command(Protocol):
def execute(self) -> None:
...
class RequestElevatorCommand:
def __init__(self, bus: EventBus, request: CallRequest):
self.bus = bus
self.request = request
def execute(self) -> None:
self.bus.publish("call.requested", {"request": self.request})
@dataclass
class CabRecord:
cab: ElevatorCab
online: bool = True
class CabRepository:
def __init__(self, records: Dict[str, CabRecord]):
self._records = records
def set_online(self, cab_id: str) -> None:
self._records[cab_id].online = True
def set_offline(self, cab_id: str) -> None:
self._records[cab_id].online = False
def online_cabs(self) -> List[ElevatorCab]:
return [record.cab for record in self._records.values() if record.online]
def snapshot(self) -> Dict[str, bool]:
return {cab_id: record.online for cab_id, record in self._records.items()}
class AssignmentJournal:
def __init__(self) -> None:
self._assignments: Dict[str, List[CallRequest]] = defaultdict(list)
def assign(self, cab_id: str, request: CallRequest) -> None:
if all(req.floor != request.floor or req.direction != request.direction for req in self._assignments[cab_id]):
self._assignments[cab_id].append(request)
def complete(self, cab_id: str, floor: int) -> None:
self._assignments[cab_id] = [req for req in self._assignments[cab_id] if req.floor != floor]
def failover(self, cab_id: str) -> List[CallRequest]:
return self._assignments.pop(cab_id, [])
def snapshot(self) -> Dict[str, List[int]]:
return {cab: [req.floor for req in requests] for cab, requests in self._assignments.items()}
class ElevatorOrchestrator:
def __init__(self, repository: CabRepository, journal: AssignmentJournal, strategy: DispatchStrategy, bus: EventBus):
self.repository = repository
self.journal = journal
self.strategy = strategy
self.bus = bus
self.pending: Deque[CallRequest] = deque()
bus.subscribe("call.requested", self._handle_request)
bus.subscribe("cab.arrived", self._handle_arrival)
bus.subscribe("cab.online", self._handle_online)
bus.subscribe("cab.offline", self._handle_offline)
def request(self, floor: int, direction: str) -> Command:
return RequestElevatorCommand(self.bus, CallRequest(floor, direction))
def _handle_request(self, payload: dict) -> None:
self.pending.append(payload["request"])
self._assign_pending()
def _handle_arrival(self, payload: dict) -> None:
cab_id = payload["cab"]
floor = payload["floor"]
self.journal.complete(cab_id, floor)
self.bus.publish("call.completed", {"cab": cab_id, "floor": floor})
self._assign_pending()
def _handle_online(self, _: dict) -> None:
self._assign_pending()
def _handle_offline(self, _: dict) -> None:
self._assign_pending()
def _assign_pending(self) -> None:
while self.pending:
candidates = self.repository.online_cabs()
if not candidates:
return
request = self.pending[0]
cab = self.strategy.choose(request, candidates)
if request.floor not in cab.queue:
cab.enqueue(request.floor)
self.journal.assign(cab.cab_id, request)
self.bus.publish("call.assigned", {"cab": cab.cab_id, "floor": request.floor, "direction": request.direction})
self.pending.popleft()
class HealthMonitor:
def __init__(self, repository: CabRepository, bus: EventBus):
self.repository = repository
self.bus = bus
self.last_floor: Dict[str, int] = {}
bus.subscribe("cab.status", self._capture_status)
bus.subscribe("cab.health", self._handle_health)
def _capture_status(self, payload: dict) -> None:
self.last_floor[payload["cab"]] = payload["floor"]
def _handle_health(self, payload: dict) -> None:
cab_id = payload["cab"]
online = payload["online"]
floor = self.last_floor.get(cab_id, 1)
if online:
self.repository.set_online(cab_id)
self.bus.publish("cab.online", {"cab": cab_id, "floor": floor})
else:
self.repository.set_offline(cab_id)
self.bus.publish("cab.offline", {"cab": cab_id, "floor": floor})
class FailoverSaga:
def __init__(self, bus: EventBus, journal: AssignmentJournal):
self.bus = bus
self.journal = journal
bus.subscribe("cab.offline", self._requeue)
def _requeue(self, payload: dict) -> None:
cab_id = payload["cab"]
floor = payload["floor"]
for request in self.journal.failover(cab_id):
self.bus.publish("call.requeued", {"from": cab_id, "floor": request.floor})
direction = request.direction
if direction == "IDLE":
direction = "UP" if request.floor >= floor else "DOWN"
self.bus.publish("call.requested", {"request": CallRequest(request.floor, direction)})
class MetricsProjection:
def __init__(self, bus: EventBus) -> None:
self.counters: Dict[str, int] = {"completed": 0, "requeued": 0, "degraded": 0}
self.degraded: set[str] = set()
bus.subscribe("call.completed", self._on_completed)
bus.subscribe("call.requeued", self._on_requeued)
bus.subscribe("cab.offline", self._on_offline)
def _on_completed(self, _: dict) -> None:
self.counters["completed"] += 1
def _on_requeued(self, _: dict) -> None:
self.counters["requeued"] += 1
def _on_offline(self, payload: dict) -> None:
cab = payload["cab"]
if cab not in self.degraded:
self.degraded.add(cab)
self.counters["degraded"] += 1
class FleetSupervisor:
def __init__(self, cabs: Iterable[ElevatorCab], bus: EventBus):
self.cabs = list(cabs)
self.bus = bus
def tick(self) -> None:
for cab in self.cabs:
cab.step()
self.bus.pump()
def main() -> None:
bus = EventBus()
cabs = [
ElevatorCab("CAR-A", max_floor=20, bus=bus),
ElevatorCab("CAR-B", max_floor=20, bus=bus),
ElevatorCab("CAR-C", max_floor=20, bus=bus),
]
repository = CabRepository({cab.cab_id: CabRecord(cab) for cab in cabs})
journal = AssignmentJournal()
orchestrator = ElevatorOrchestrator(repository, journal, DispatchStrategy(), bus)
HealthMonitor(repository, bus)
FailoverSaga(bus, journal)
metrics = MetricsProjection(bus)
supervisor = FleetSupervisor(cabs, bus)
bus.subscribe("call.assigned", lambda event: print("[assigned]", event))
bus.subscribe("call.completed", lambda event: print("[completed]", event))
bus.subscribe("call.requeued", lambda event: print("[requeued]", event))
for floor, direction in [(6, "UP"), (14, "DOWN"), (3, "UP"), (18, "DOWN")]:
orchestrator.request(floor, direction).execute()
bus.pump()
for _ in range(6):
supervisor.tick()
bus.publish("cab.health", {"cab": "CAR-B", "online": False})
bus.pump()
for _ in range(4):
supervisor.tick()
bus.publish("cab.health", {"cab": "CAR-B", "online": True})
bus.pump()
for _ in range(4):
supervisor.tick()
print("Assignments snapshot:", journal.snapshot())
print("Cab availability:", repository.snapshot())
print("Metrics:", metrics.counters)
if __name__ == "__main__":
main()
Progressively build the producer-consumer pipeline from basic bounded queues to worker pools and resilient retry/dead-letter handling.
Stream Processing Stack ├─ Level 1: Bounded Queue ├─ Level 2: Worker Pool └─ Level 3: Resilient Stream Processor
Level 1 — Core Implementation
Implement a bounded blocking queue with producer and consumer threads. Sample Input: Producers push items 1..5. Sample Output: Consumers pop in FIFO order respecting capacity.
import threading
from collections import deque
from typing import Deque, Optional
class BoundedQueue:
def __init__(self, capacity: int) -> None:
self.capacity = capacity
self.items: Deque[int] = deque()
self.lock = threading.Lock()
self.not_full = threading.Condition(self.lock)
self.not_empty = threading.Condition(self.lock)
def put(self, item: int) -> None:
with self.not_full:
while len(self.items) >= self.capacity:
self.not_full.wait()
self.items.append(item)
self.not_empty.notify()
def get(self) -> int:
with self.not_empty:
while not self.items:
self.not_empty.wait()
item = self.items.popleft()
self.not_full.notify()
return item
def producer(queue: BoundedQueue, name: str, numbers: range) -> None:
for number in numbers:
queue.put(number)
print(f"{name} produced {number}")
def consumer(queue: BoundedQueue, name: str, consume: int) -> None:
for _ in range(consume):
item = queue.get()
print(f"{name} consumed {item}")
def main() -> None:
queue = BoundedQueue(2)
prod = threading.Thread(target=producer, args=(queue, "P1", range(1, 6)))
cons = threading.Thread(target=consumer, args=(queue, "C1", 5))
prod.start()
cons.start()
prod.join()
cons.join()
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Create a thread pool that consumes tasks concurrently, supporting graceful shutdown and backpressure. Sample Input: Submit 6 jobs to a pool of 3 workers. Sample Output: Jobs processed concurrently and queue drained during shutdown.
import threading
import time
from queue import Queue, Empty
from typing import Callable
class WorkerPool:
def __init__(self, workers: int) -> None:
self.tasks: "Queue[Callable[[], None]]" = Queue()
self.threads = [threading.Thread(target=self._worker, daemon=True) for _ in range(workers)]
self.stop_flag = threading.Event()
for thread in self.threads:
thread.start()
def submit(self, task: Callable[[], None]) -> None:
self.tasks.put(task)
def _worker(self) -> None:
while not self.stop_flag.is_set():
try:
task = self.tasks.get(timeout=0.2)
except Empty:
continue
try:
task()
finally:
self.tasks.task_done()
def shutdown(self) -> None:
self.tasks.join()
self.stop_flag.set()
for thread in self.threads:
thread.join(timeout=0.1)
def main() -> None:
pool = WorkerPool(3)
for idx in range(6):
pool.submit(lambda idx=idx: print(f"Worker executed job {idx}") or time.sleep(0.1))
pool.shutdown()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Build a resilient stream processor that retries failing jobs with exponential backoff and sends irrecoverable failures to a dead-letter queue. Sample Input: Stream with intermittent failures. Sample Output: Successful retries logged and failed jobs captured for inspection.
import random
import time
from dataclasses import dataclass
from queue import Queue, Empty
from typing import Callable
@dataclass
class StreamMessage:
payload: str
attempts: int = 0
class DeadLetterQueue:
def __init__(self) -> None:
self.messages = []
def record(self, message: StreamMessage, reason: str) -> None:
print("Dead letter:", message.payload, reason)
self.messages.append((message, reason))
class ResilientStreamProcessor:
def __init__(self, handler: Callable[[StreamMessage], None], max_attempts: int = 3) -> None:
self.queue: "Queue[StreamMessage]" = Queue()
self.handler = handler
self.max_attempts = max_attempts
self.dead_letters = DeadLetterQueue()
def publish(self, payload: str) -> None:
self.queue.put(StreamMessage(payload))
def start(self) -> None:
while True:
try:
message = self.queue.get(timeout=0.2)
except Empty:
break
try:
self.handler(message)
print("Processed:", message.payload)
except Exception as exc:
message.attempts += 1
if message.attempts >= self.max_attempts:
self.dead_letters.record(message, str(exc))
else:
backoff = 0.1 * (2 ** message.attempts)
print(f"Retrying {message.payload} in {backoff:.2f}s")
time.sleep(backoff)
self.queue.put(message)
finally:
self.queue.task_done()
def flaky_handler(message: StreamMessage) -> None:
if random.random() < 0.4:
raise RuntimeError("transient failure")
def main() -> None:
random.seed(6)
processor = ResilientStreamProcessor(flaky_handler)
for payload in ["A", "B", "C", "D"]:
processor.publish(payload)
processor.start()
print("Dead letters:", processor.dead_letters.messages)
if __name__ == "__main__":
main()
Progressively enhance the task scheduler from delayed-job execution to recurring scheduling and a resilient cron engine with persistence.
Task Scheduler Stack ├─ Level 1: Delayed Job Scheduler ├─ Level 2: Recurring Jobs Coordinator └─ Level 3: Resilient Cron Engine
Level 1 — Core Implementation
Build a simple task scheduler that executes jobs at future timestamps. Sample Input: schedule("job1", run in 0.5s). Sample Output: Job executed at approximately scheduled time in FIFO order.
import heapq
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, List
@dataclass(order=True)
class ScheduledTask:
run_at: float
action: Callable[[], None] = field(compare=False)
class TaskScheduler:
def __init__(self) -> None:
self.tasks: List[ScheduledTask] = []
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.worker = threading.Thread(target=self._run, daemon=True)
self.worker.start()
def schedule(self, delay: float, action: Callable[[], None]) -> None:
with self.condition:
heapq.heappush(self.tasks, ScheduledTask(time.time() + delay, action))
self.condition.notify()
def _run(self) -> None:
while True:
with self.condition:
while not self.tasks:
self.condition.wait()
task = self.tasks[0]
now = time.time()
if task.run_at > now:
self.condition.wait(task.run_at - now)
continue
heapq.heappop(self.tasks)
task.action()
def main() -> None:
scheduler = TaskScheduler()
scheduler.schedule(0.2, lambda: print("Task A at", time.time()))
scheduler.schedule(0.4, lambda: print("Task B at", time.time()))
time.sleep(1)
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Support recurring jobs with configurable intervals and concurrent execution workers. Sample Input: schedule job every 0.3 seconds. Sample Output: Job executed roughly at interval on different worker threads.
import heapq
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, List, Optional
@dataclass(order=True)
class RecurringTask:
next_run: float
interval: float = field(compare=False)
action: Callable[[], None] = field(compare=False)
class RecurringScheduler:
def __init__(self, workers: int = 2) -> None:
self.queue: List[RecurringTask] = []
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.stop = False
self.workers = [threading.Thread(target=self._worker, daemon=True) for _ in range(workers)]
for worker in self.workers:
worker.start()
def schedule(self, interval: float, action: Callable[[], None]) -> None:
with self.condition:
heapq.heappush(self.queue, RecurringTask(time.time() + interval, interval, action))
self.condition.notify()
def _worker(self) -> None:
while True:
with self.condition:
while not self.queue and not self.stop:
self.condition.wait()
if self.stop and not self.queue:
return
task = self.queue[0]
now = time.time()
if task.next_run > now:
self.condition.wait(task.next_run - now)
continue
heapq.heappop(self.queue)
try:
task.action()
finally:
with self.condition:
heapq.heappush(self.queue, RecurringTask(time.time() + task.interval, task.interval, task.action))
self.condition.notify()
def shutdown(self) -> None:
with self.condition:
self.stop = True
self.condition.notify_all()
for worker in self.workers:
worker.join()
def main() -> None:
scheduler = RecurringScheduler(workers=2)
scheduler.schedule(0.3, lambda: print("Heartbeat at", time.time()))
scheduler.schedule(0.5, lambda: print("Cleanup at", time.time()))
time.sleep(1.5)
scheduler.shutdown()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Persist scheduled jobs, recover after crashes, and retry failed executions with jitter to avoid thundering herds. Sample Input: Jobs persisted to disk with injected failures. Sample Output: Failed job retried, state recovered across runs.
import json
import os
import random
import threading
import time
from dataclasses import dataclass, asdict
from typing import Callable, Dict, List
@dataclass
class PersistentJob:
job_id: str
interval: float
next_run: float
class JsonJobStore:
def __init__(self, path: str) -> None:
self.path = path
if not os.path.exists(path):
with open(path, "w", encoding="utf-8") as handle:
json.dump([], handle)
def load(self) -> List[PersistentJob]:
with open(self.path, "r", encoding="utf-8") as handle:
data = json.load(handle)
return [PersistentJob(**entry) for entry in data]
def save(self, jobs: List[PersistentJob]) -> None:
with open(self.path, "w", encoding="utf-8") as handle:
json.dump([asdict(job) for job in jobs], handle)
class ResilientCronEngine:
def __init__(self, store: JsonJobStore, handlers: Dict[str, Callable[[], None]]) -> None:
self.store = store
self.handlers = handlers
self.jobs = {job.job_id: job for job in store.load()}
self.lock = threading.Lock()
self.stop = False
threading.Thread(target=self._loop, daemon=True).start()
def register(self, job_id: str, interval: float) -> None:
with self.lock:
job = PersistentJob(job_id, interval, time.time() + interval)
self.jobs[job_id] = job
self.store.save(list(self.jobs.values()))
def _loop(self) -> None:
while not self.stop:
now = time.time()
due = []
with self.lock:
for job in self.jobs.values():
if job.next_run <= now:
due.append(job)
for job in due:
self._execute(job)
time.sleep(0.1)
def _execute(self, job: PersistentJob) -> None:
handler = self.handlers[job.job_id]
attempt = 0
while True:
try:
handler()
break
except Exception as exc:
attempt += 1
if attempt >= 3:
print(f"Job {job.job_id} failed permanently:", exc)
break
backoff = 0.2 * (2 ** attempt) + random.random() * 0.1
print(f"Retrying job {job.job_id} in {backoff:.2f}s due to {exc}")
time.sleep(backoff)
with self.lock:
job.next_run = time.time() + job.interval
self.store.save(list(self.jobs.values()))
def shutdown(self) -> None:
self.stop = True
def flaky_job() -> None:
if random.random() < 0.3:
raise RuntimeError("intermittent failure")
print("Flaky job executed at", time.time())
def heartbeat_job() -> None:
print("Heartbeat job at", time.time())
def main() -> None:
random.seed(1)
store = JsonJobStore("/tmp/cron-jobs.json")
engine = ResilientCronEngine(store, {"flaky": flaky_job, "heartbeat": heartbeat_job})
if "flaky" not in engine.jobs:
engine.register("flaky", 0.4)
engine.register("heartbeat", 0.6)
time.sleep(2)
engine.shutdown()
if __name__ == "__main__":
main()
Grow the order management platform from basic state handling to concurrent orchestration and a resilient saga-based workflow.
Order Management Stack ├─ Level 1: State Machine ├─ Level 2: Concurrent Orchestrator └─ Level 3: Resilient Event-Sourced Saga
Level 1 — Core Implementation
Model an order lifecycle with states such as CREATED, PACKED, and SHIPPED, enforcing valid transitions. Sample Input: order.advance("PACKED"), order.advance("SHIPPED"). Sample Output: Order transitions through valid states and rejects invalid moves.
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum, auto
from typing import Dict, Set
class OrderState(Enum):
CREATED = auto()
PACKED = auto()
SHIPPED = auto()
DELIVERED = auto()
CANCELLED = auto()
ALLOWED: Dict[OrderState, Set[OrderState]] = {
OrderState.CREATED: {OrderState.PACKED, OrderState.CANCELLED},
OrderState.PACKED: {OrderState.SHIPPED},
OrderState.SHIPPED: {OrderState.DELIVERED},
}
@dataclass
class Order:
order_id: str
state: OrderState = OrderState.CREATED
def advance(self, next_state: OrderState) -> None:
allowed = ALLOWED.get(self.state, set())
if next_state not in allowed:
raise RuntimeError(f"Invalid transition {self.state} -> {next_state}")
self.state = next_state
def cancel(self) -> None:
if self.state in (OrderState.DELIVERED, OrderState.CANCELLED):
raise RuntimeError("Cannot cancel final order")
self.state = OrderState.CANCELLED
def main() -> None:
order = Order("O1")
print(order)
order.advance(OrderState.PACKED)
order.advance(OrderState.SHIPPED)
print("Current state:", order.state)
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Handle concurrent updates and publish events when orders transition states. Sample Input: Multiple threads updating orders. Sample Output: Consistent order states with emitted events logged.
from __future__ import annotations
import threading
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum, auto
from typing import Callable, DefaultDict, Dict, List
class OrderState(Enum):
CREATED = auto()
PACKED = auto()
SHIPPED = auto()
DELIVERED = auto()
TRANSITIONS = {
OrderState.CREATED: {OrderState.PACKED},
OrderState.PACKED: {OrderState.SHIPPED},
OrderState.SHIPPED: {OrderState.DELIVERED},
}
@dataclass
class Order:
order_id: str
state: OrderState = OrderState.CREATED
class EventBus:
def __init__(self) -> None:
self.subscribers: DefaultDict[str, List[Callable[[Order], None]]] = defaultdict(list)
def subscribe(self, event: str, handler: Callable[[Order], None]) -> None:
self.subscribers[event].append(handler)
def publish(self, event: str, order: Order) -> None:
for handler in self.subscribers[event]:
handler(order)
class OrderService:
def __init__(self, bus: EventBus) -> None:
self.bus = bus
self.orders: Dict[str, Order] = {}
self.locks: DefaultDict[str, threading.Lock] = defaultdict(threading.Lock)
def create(self, order_id: str) -> Order:
order = Order(order_id)
self.orders[order_id] = order
self.bus.publish("created", order)
return order
def transition(self, order_id: str, next_state: OrderState) -> None:
lock = self.locks[order_id]
with lock:
order = self.orders[order_id]
allowed = TRANSITIONS.get(order.state, set())
if next_state not in allowed:
raise RuntimeError("Invalid transition")
order.state = next_state
self.bus.publish(next_state.name.lower(), order)
def main() -> None:
bus = EventBus()
bus.subscribe("shipped", lambda order: print("Order shipped:", order.order_id))
service = OrderService(bus)
service.create("O1")
def worker():
service.transition("O1", OrderState.PACKED)
service.transition("O1", OrderState.SHIPPED)
threads = [threading.Thread(target=worker) for _ in range(2)]
for t in threads:
t.start()
for t in threads:
t.join()
print("Final state:", service.orders["O1"].state)
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Coordinate order, payment, and inventory services using a saga with an outbox to guarantee event delivery. Sample Input: 3 orders with intermittent payment failures. Sample Output: Successful orders committed, failed ones compensated and recorded.
from __future__ import annotations
import random
import time
from dataclasses import dataclass
from enum import Enum, auto
from typing import List
class OrderState(Enum):
CREATED = auto()
CONFIRMED = auto()
FAILED = auto()
@dataclass
class Order:
order_id: str
state: OrderState = OrderState.CREATED
class PaymentService:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def charge(self, order_id: str) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("payment failure")
class InventoryService:
def reserve(self, order_id: str) -> None:
print("Inventory reserved for", order_id)
def release(self, order_id: str) -> None:
print("Inventory released for", order_id)
class Outbox:
def __init__(self) -> None:
self.events: List[str] = []
def record(self, event: str) -> None:
self.events.append(event)
print("Recorded event:", event)
class OrderSaga:
def __init__(self, payments: PaymentService, inventory: InventoryService, outbox: Outbox) -> None:
self.payments = payments
self.inventory = inventory
self.outbox = outbox
def execute(self, order: Order) -> None:
try:
self.inventory.reserve(order.order_id)
attempts = 0
while attempts < 3:
try:
self.payments.charge(order.order_id)
break
except Exception as exc:
attempts += 1
if attempts >= 3:
raise
backoff = 0.2 * attempts
print("Retry payment in", backoff)
time.sleep(backoff)
order.state = OrderState.CONFIRMED
self.outbox.record(f"order.confirmed:{order.order_id}")
except Exception as exc:
order.state = OrderState.FAILED
self.inventory.release(order.order_id)
self.outbox.record(f"order.failed:{order.order_id}:{exc}")
def main() -> None:
random.seed(12)
saga = OrderSaga(PaymentService(0.5), InventoryService(), Outbox())
for idx in range(3):
order = Order(f"O{idx}")
saga.execute(order)
print(order)
if __name__ == "__main__":
main()
Progressively enhance the auction system from simple bid tracking to concurrent bidding with fallback flows.
Auction Platform ├─ Level 1: Highest Bid Wins ├─ Level 2: Concurrent Bid Engine └─ Level 3: Resilient Auction Orchestrator
Level 1 — Core Implementation
Implement a simple auction that accepts bids and determines the winner when closed. Sample Input: bids from users A,B,C. Sample Output: Winner with highest bid and notifications sent.
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable, List, Optional
@dataclass
class Bid:
bidder: str
amount: float
timestamp: float
class HighestBidStrategy:
def select(self, bids: List[Bid]) -> Optional[Bid]:
if not bids:
return None
return max(bids, key=lambda bid: (bid.amount, -bid.timestamp))
class Auction:
def __init__(self, strategy: HighestBidStrategy, notifier: Callable[[str], None]) -> None:
self.bids: List[Bid] = []
self.strategy = strategy
self.notifier = notifier
def place_bid(self, bid: Bid) -> None:
self.bids.append(bid)
def close(self) -> Optional[Bid]:
winner = self.strategy.select(self.bids)
if winner:
self.notifier(f"Auction won by {winner.bidder} for {winner.amount}")
return winner
def main() -> None:
auction = Auction(HighestBidStrategy(), print)
auction.place_bid(Bid("Alice", 100.0, 1.0))
auction.place_bid(Bid("Bob", 120.0, 2.0))
auction.place_bid(Bid("Carol", 110.0, 3.0))
winner = auction.close()
print("Winner:", winner)
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Support concurrent bids with automatic closing after the auction window ends. Sample Input: Multiple threads placing bids while timer runs. Sample Output: Auction closes once timer ends and reports highest bid without race conditions.
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class Bid:
bidder: str
amount: float
class TimedAuction:
def __init__(self, duration: float) -> None:
self.duration = duration
self._lock = threading.Lock()
self._winner: Optional[Bid] = None
self._closed = False
threading.Thread(target=self._close_after_timeout, daemon=True).start()
def _close_after_timeout(self) -> None:
time.sleep(self.duration)
with self._lock:
self._closed = True
def place_bid(self, bid: Bid) -> None:
with self._lock:
if self._closed:
raise RuntimeError("Auction closed")
if not self._winner or bid.amount > self._winner.amount:
self._winner = bid
def winner(self) -> Optional[Bid]:
with self._lock:
return self._winner
def bidder_thread(auction: TimedAuction, name: str, amount: float) -> None:
try:
auction.place_bid(Bid(name, amount))
except RuntimeError as exc:
print(name, "failed:", exc)
def main() -> None:
auction = TimedAuction(0.5)
threads = [threading.Thread(target=bidder_thread, args=(auction, f"B{i}", 100 + i * 10)) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
time.sleep(0.6)
print("Auction winner:", auction.winner())
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Coordinate bids across replicas, tolerating node failures using quorum consensus and write-ahead logs. Sample Input: Bids sent to cluster with one node failing. Sample Output: Cluster reaches consensus on highest bid, failed node recovers from log.
from __future__ import annotations
import random
from dataclasses import dataclass
from typing import Dict, List, Optional, Optional
@dataclass
class Bid:
bidder: str
amount: float
class Replica:
def __init__(self, name: str, failure_rate: float = 0.2) -> None:
self.name = name
self.failure_rate = failure_rate
self.log: List[Bid] = []
self.available = True
def append(self, bid: Bid) -> None:
if not self.available or random.random() < self.failure_rate:
self.available = False
raise RuntimeError(f"{self.name} unavailable")
self.log.append(bid)
def recover(self) -> None:
self.available = True
class AuctionCoordinator:
def __init__(self, replicas: List[Replica], quorum: int) -> None:
self.replicas = replicas
self.quorum = quorum
def submit_bid(self, bid: Bid) -> None:
successes = 0
failures = []
for replica in self.replicas:
try:
replica.append(bid)
successes += 1
except Exception as exc:
failures.append((replica, exc))
if successes < self.quorum:
raise RuntimeError("Not enough replicas")
for replica, _ in failures:
replica.recover()
def winner(self) -> Optional[Bid]:
merged: List[Bid] = []
for replica in self.replicas:
merged.extend(replica.log)
if not merged:
return None
return max(merged, key=lambda bid: bid.amount)
def main() -> None:
random.seed(3)
replicas = [Replica("R1", 0.3), Replica("R2", 0.1), Replica("R3", 0.2)]
coordinator = AuctionCoordinator(replicas, quorum=2)
for amount in [120.0, 150.0, 140.0]:
try:
coordinator.submit_bid(Bid(f"Bidder{amount}", amount))
except Exception as exc:
print("Submit failed:", exc)
print("Winning bid:", coordinator.winner())
if __name__ == "__main__":
main()
Build the wallet service from ledger recording to concurrent transactions and resilient transfer orchestration.
Wallet Service Stack ├─ Level 1: Core Ledger ├─ Level 2: Concurrent Transactions └─ Level 3: Resilient Transfers
Level 1 — Core Implementation
Create a wallet service to credit, debit, and query balances with idempotent transaction IDs. Sample Input: credit("U1", 100, "txn1"), debit("U1", 40, "txn2"). Sample Output: Balance 60 with duplicate txn ignored.
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Set
@dataclass
class LedgerEntry:
user_id: str
amount: float
txn_id: str
class WalletService:
def __init__(self) -> None:
self.balances: Dict[str, float] = {}
self.seen_txn: Set[str] = set()
def credit(self, user_id: str, amount: float, txn_id: str) -> None:
if txn_id in self.seen_txn:
return
self.balances[user_id] = self.balances.get(user_id, 0.0) + amount
self.seen_txn.add(txn_id)
def debit(self, user_id: str, amount: float, txn_id: str) -> None:
if txn_id in self.seen_txn:
return
balance = self.balances.get(user_id, 0.0)
if balance < amount:
raise RuntimeError("insufficient funds")
self.balances[user_id] = balance - amount
self.seen_txn.add(txn_id)
def balance(self, user_id: str) -> float:
return self.balances.get(user_id, 0.0)
def main() -> None:
wallet = WalletService()
wallet.credit("U1", 100.0, "txn1")
wallet.debit("U1", 40.0, "txn2")
wallet.credit("U1", 100.0, "txn1")
print("Balance:", wallet.balance("U1"))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Handle concurrent credit/debit operations with thread safety and per-account locks. Sample Input: 4 threads posting transactions. Sample Output: Final balance consistent and no race conditions observed.
from __future__ import annotations
import threading
from collections import defaultdict
from typing import Dict, Set
class AccountLocks:
def __init__(self) -> None:
self._locks: Dict[str, threading.Lock] = defaultdict(threading.Lock)
def acquire(self, user_id: str) -> threading.Lock:
lock = self._locks[user_id]
lock.acquire()
return lock
class ConcurrentWallet:
def __init__(self) -> None:
self.balances: Dict[str, float] = defaultdict(float)
self.seen_txn: Set[str] = set()
self.account_locks = AccountLocks()
self.txn_lock = threading.Lock()
def credit(self, user_id: str, amount: float, txn_id: str) -> None:
with self.txn_lock:
if txn_id in self.seen_txn:
return
self.seen_txn.add(txn_id)
lock = self.account_locks.acquire(user_id)
try:
self.balances[user_id] += amount
finally:
lock.release()
def debit(self, user_id: str, amount: float, txn_id: str) -> None:
with self.txn_lock:
if txn_id in self.seen_txn:
return
self.seen_txn.add(txn_id)
lock = self.account_locks.acquire(user_id)
try:
if self.balances[user_id] < amount:
raise RuntimeError("insufficient funds")
self.balances[user_id] -= amount
finally:
lock.release()
def worker(wallet: ConcurrentWallet, user_id: str, amount: float, txn_prefix: str) -> None:
wallet.credit(user_id, amount, f"{txn_prefix}-credit")
try:
wallet.debit(user_id, amount / 2, f"{txn_prefix}-debit")
except Exception as exc:
print("Debit failed:", exc)
def main() -> None:
wallet = ConcurrentWallet()
threads = [threading.Thread(target=worker, args=(wallet, "U1", 50.0, f"T{i}")) for i in range(4)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Final balance:", wallet.balances["U1"])
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Integrate with an external payment network using circuit breaking and an outbox to ensure durability of debits. Sample Input: 3 payouts with flaky network. Sample Output: Successful payouts processed; failures queued for retry with logs.
from __future__ import annotations
import random
import time
from dataclasses import dataclass
from queue import Queue, Empty
from typing import Dict
@dataclass
class Payout:
user_id: str
amount: float
txn_id: str
class ExternalNetwork:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def payout(self, user_id: str, amount: float) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("network failure")
print(f"Payout to {user_id} for {amount}")
class CircuitBreaker:
def __init__(self, threshold: int, cool_down: float) -> None:
self.threshold = threshold
self.cool_down = cool_down
self.failures = 0
self.open_until = 0.0
def call(self, func, *args, **kwargs):
now = time.time()
if now < self.open_until:
raise RuntimeError("circuit-open")
try:
result = func(*args, **kwargs)
self.failures = 0
return result
except Exception as exc:
self.failures += 1
if self.failures >= self.threshold:
self.open_until = now + self.cool_down
self.failures = 0
raise exc
class ResilientWallet:
def __init__(self, network: ExternalNetwork) -> None:
self.balances: Dict[str, float] = {"U1": 200.0}
self.breaker = CircuitBreaker(2, 1.0)
self.outbox: "Queue[Payout]" = Queue()
self.network = network
def payout(self, user_id: str, amount: float, txn_id: str) -> None:
if self.balances.get(user_id, 0.0) < amount:
raise RuntimeError("insufficient funds")
self.balances[user_id] -= amount
self.outbox.put(Payout(user_id, amount, txn_id))
def process_outbox(self) -> None:
pending = []
while True:
try:
payout = self.outbox.get_nowait()
except Empty:
break
try:
self.breaker.call(self.network.payout, payout.user_id, payout.amount)
print("Payout success:", payout)
except Exception as exc:
print("Payout failed:", exc, "requeueing")
pending.append(payout)
finally:
self.outbox.task_done()
for payout in pending:
self.outbox.put(payout)
def main() -> None:
random.seed(10)
wallet = ResilientWallet(ExternalNetwork(0.5))
wallet.payout("U1", 50.0, "tx1")
wallet.payout("U1", 30.0, "tx2")
wallet.payout("U1", 40.0, "tx3")
for _ in range(5):
wallet.process_outbox()
time.sleep(0.2)
print("Remaining balance:", wallet.balances["U1"])
if __name__ == "__main__":
main()
Grow the tic-tac-toe system from a game engine to a concurrent match server with resilient matchmaking orchestration.
Tic-Tac-Toe Platform ├─ Level 1: Core Game Engine ├─ Level 2: Concurrent Match Server └─ Level 3: Resilient Matchmaking
Level 1 — Core Implementation
Build a Tic-Tac-Toe engine that validates moves, detects winners, and reports draws. Sample Input: Sequence of moves resulting in X winning diagonally. Sample Output: Winner X detected after move.
from typing import List, Optional
class TicTacToe:
def __init__(self) -> None:
self.board: List[List[str]] = [["" for _ in range(3)] for _ in range(3)]
self.current = "X"
def move(self, row: int, col: int) -> Optional[str]:
if self.board[row][col]:
raise RuntimeError("Cell occupied")
self.board[row][col] = self.current
winner = self._winner()
if winner or self._is_draw():
return winner or "DRAW"
self.current = "O" if self.current == "X" else "X"
return None
def _winner(self) -> Optional[str]:
lines = []
lines.extend(self.board)
lines.extend([[self.board[r][c] for r in range(3)] for c in range(3)])
lines.append([self.board[i][i] for i in range(3)])
lines.append([self.board[i][2 - i] for i in range(3)])
for line in lines:
if line[0] and line.count(line[0]) == 3:
return line[0]
return None
def _is_draw(self) -> bool:
return all(cell for row in self.board for cell in row)
def main() -> None:
game = TicTacToe()
moves = [(0, 0), (1, 0), (1, 1), (2, 0), (2, 2)]
for move in moves:
result = game.move(*move)
print("Move", move, "result:", result)
if result:
break
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Run multiple Tic-Tac-Toe matches concurrently, ensuring each session is isolated. Sample Input: Two matches progressing in parallel threads. Sample Output: Each match completes with correct results.
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Dict, List, Optional, Optional
class TicTacToeEngine:
def __init__(self) -> None:
self.board = [["" for _ in range(3)] for _ in range(3)]
self.current = "X"
def move(self, row: int, col: int) -> Optional[str]:
if self.board[row][col]:
raise RuntimeError("Invalid move")
self.board[row][col] = self.current
if self._winner():
return self.current
if all(cell for row in self.board for cell in row):
return "DRAW"
self.current = "O" if self.current == "X" else "X"
return None
def _winner(self) -> bool:
lines = []
lines.extend(self.board)
lines.extend([[self.board[r][c] for r in range(3)] for c in range(3)])
lines.append([self.board[i][i] for i in range(3)])
lines.append([self.board[i][2 - i] for i in range(3)])
return any(line[0] and line.count(line[0]) == 3 for line in lines)
@dataclass
class MatchSession:
match_id: str
engine: TicTacToeEngine
lock: threading.Lock
class MatchServer:
def __init__(self) -> None:
self.sessions: Dict[str, MatchSession] = {}
def create_match(self, match_id: str) -> None:
self.sessions[match_id] = MatchSession(match_id, TicTacToeEngine(), threading.Lock())
def play(self, match_id: str, row: int, col: int) -> Optional[str]:
session = self.sessions[match_id]
with session.lock:
return session.engine.move(row, col)
def match_runner(server: MatchServer, match_id: str, moves: List[tuple[int, int]]) -> None:
for move in moves:
result = server.play(match_id, *move)
if result:
print(match_id, "ended with", result)
break
def main() -> None:
server = MatchServer()
server.create_match("M1")
server.create_match("M2")
thread1 = threading.Thread(target=match_runner, args=(server, "M1", [(0, 0), (0, 1), (1, 1), (0, 2), (2, 2)]))
thread2 = threading.Thread(target=match_runner, args=(server, "M2", [(1, 1), (0, 0), (2, 2), (0, 1), (0, 2), (2, 0), (1, 0)]))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Persist every move so matches can recover after restart, replaying events to reconstruct state. Sample Input: Moves persisted then engine reinstantiated from log. Sample Output: Replayed board matches original state.
import json
import os
from typing import List, Optional, Tuple
class EventStore:
def __init__(self, path: str) -> None:
self.path = path
if not os.path.exists(path):
with open(path, "w", encoding="utf-8") as handle:
json.dump([], handle)
def append(self, match_id: str, move: Tuple[int, int]) -> None:
events = self.load_all()
events.append({"match_id": match_id, "move": move})
with open(self.path, "w", encoding="utf-8") as handle:
json.dump(events, handle)
def load_all(self) -> List[dict]:
with open(self.path, "r", encoding="utf-8") as handle:
return json.load(handle)
class ReplayableTicTacToe:
def __init__(self) -> None:
self.board = [["" for _ in range(3)] for _ in range(3)]
self.current = "X"
def apply(self, row: int, col: int) -> Optional[str]:
self.board[row][col] = self.current
winner = self._winner()
self.current = "O" if self.current == "X" else "X"
return winner
def _winner(self) -> Optional[str]:
lines = []
lines.extend(self.board)
lines.extend([[self.board[r][c] for r in range(3)] for c in range(3)])
lines.append([self.board[i][i] for i in range(3)])
lines.append([self.board[i][2 - i] for i in range(3)])
for line in lines:
if line[0] and line.count(line[0]) == 3:
return line[0]
return None
def main() -> None:
store = EventStore("/tmp/tictactoe-events.json")
game = ReplayableTicTacToe()
moves = [(0, 0), (0, 1), (1, 1), (2, 0), (2, 2)]
for move in moves:
winner = game.apply(*move)
store.append("match-1", move)
replay = ReplayableTicTacToe()
for event in store.load_all():
replay.apply(*event["move"])
print("Original board:", game.board)
print("Replayed board:", replay.board)
if __name__ == "__main__":
main()
Enhance the chat system from simple chat rooms to concurrent messaging infrastructure with delivery guarantees.
Chat Service Stack ├─ Level 1: Basic Chat Room ├─ Level 2: Concurrent Messaging └─ Level 3: Resilient Message Delivery
Level 1 — Core Implementation
Implement a direct messaging service that stores conversations and notifies recipients. Sample Input: send("alice","bob","hi"). Sample Output: Message stored with notification callback triggered.
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, DefaultDict, List
@dataclass
class Message:
sender: str
recipient: str
body: str
message_id: str | None = None
channel: str | None = None
class ConversationRepository:
def __init__(self) -> None:
self._direct: DefaultDict[tuple[str, str], List[Message]] = defaultdict(list)
def add_direct(self, message: Message) -> None:
key = tuple(sorted([message.sender, message.recipient]))
self._direct[key].append(message)
def history(self, user_a: str, user_b: str) -> List[Message]:
key = tuple(sorted([user_a, user_b]))
return list(self._direct[key])
class ChatService:
def __init__(self, repo: ConversationRepository, notifier: Callable[[Message], None]) -> None:
self.repo = repo
self.notifier = notifier
def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
return Message(sender, recipient, body)
def send(self, sender: str, recipient: str, body: str) -> Message:
message = self._create_direct_message(sender, recipient, body)
self.repo.add_direct(message)
self.notifier(message)
return message
def main() -> None:
repo = ConversationRepository()
service = ChatService(repo, lambda msg: print("Notify", msg.recipient, ":", msg.body))
service.send("alice", "bob", "hello bob")
service.send("bob", "alice", "hey alice")
print("History:", [(item.sender, item.body) for item in repo.history("alice", "bob")])
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Layer channel fan-out and async delivery queues on top of the Level 1 chat core by reusing the same repository and service abstractions. Sample Input: concurrent publishes to #general. Sample Output: Subscribers receive stored messages without blocking senders.
from __future__ import annotations
import asyncio
import itertools
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, DefaultDict, Dict, List
# --- Level 1 foundation ---
@dataclass
class Message:
sender: str
recipient: str | None
body: str
message_id: str | None = None
channel: str | None = None
class ConversationRepository:
def __init__(self) -> None:
self._direct: DefaultDict[tuple[str, str], List[Message]] = defaultdict(list)
def add_direct(self, message: Message) -> None:
assert message.recipient is not None
key = tuple(sorted([message.sender, message.recipient]))
self._direct[key].append(message)
def history(self, user_a: str, user_b: str) -> List[Message]:
key = tuple(sorted([user_a, user_b]))
return list(self._direct[key])
class ChatService:
def __init__(self, repo: ConversationRepository, notifier: Callable[[Message], None]) -> None:
self.repo = repo
self.notifier = notifier
def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
return Message(sender, recipient, body)
def send(self, sender: str, recipient: str, body: str) -> Message:
message = self._create_direct_message(sender, recipient, body)
self.repo.add_direct(message)
self.notifier(message)
return message
# --- Level 2 concurrency extensions ---
class ChannelRepository(ConversationRepository):
def __init__(self) -> None:
super().__init__()
self._channels: DefaultDict[str, List[Message]] = defaultdict(list)
def add_channel(self, channel: str, message: Message) -> None:
self._channels[channel].append(message)
def channel_history(self, channel: str) -> List[Message]:
return list(self._channels[channel])
class ConcurrentChatService(ChatService):
def __init__(self, repo: ChannelRepository, notifier: Callable[[Message], None]) -> None:
super().__init__(repo, notifier)
self.repo = repo
self._ids = (str(value) for value in itertools.count(1))
def _next_id(self) -> str:
return next(self._ids)
def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
return Message(sender, recipient, body, message_id=self._next_id())
def send_to_channel(self, sender: str, channel: str, body: str) -> Message:
message = Message(sender, None, body, message_id=self._next_id(), channel=channel)
self.repo.add_channel(channel, message)
self.notifier(message)
return message
class AsyncChannelDispatcher:
def __init__(self, service: ConcurrentChatService) -> None:
self.service = service
self.subscribers: Dict[str, Dict[str, asyncio.Queue[Message]]] = defaultdict(dict)
async def subscribe(self, user: str, channel: str) -> asyncio.Queue[Message]:
queue: asyncio.Queue[Message] = asyncio.Queue()
self.subscribers[channel][user] = queue
return queue
async def publish(self, sender: str, channel: str, body: str) -> Message:
message = self.service.send_to_channel(sender, channel, body)
for queue in self.subscribers[channel].values():
await queue.put(message)
return message
async def main_async() -> None:
repo = ChannelRepository()
dispatcher = AsyncChannelDispatcher(ConcurrentChatService(repo, lambda message: None))
general_alice = await dispatcher.subscribe("alice", "#general")
general_bob = await dispatcher.subscribe("bob", "#general")
async def alice_consumer() -> None:
msg = await general_alice.get()
print("alice received", msg.body)
async def bob_consumer() -> None:
msg = await general_bob.get()
print("bob received", msg.body)
await dispatcher.publish("service-bot", "#general", "Welcome to the channel!")
await asyncio.gather(alice_consumer(), bob_consumer())
if __name__ == "__main__":
asyncio.run(main_async())
Level 3 — Resilient Architecture
Harden the Level 2 async dispatcher with per-subscriber ack tracking and retries so shared channels gain delivery guarantees. Sample Input: bob skips the first ack. Sample Output: Retry loop redelivers until ack or dead-letter limit.
from __future__ import annotations
import asyncio
import itertools
from collections import defaultdict
from dataclasses import dataclass
from typing import Callable, DefaultDict, Dict, List
# --- Level 1 foundation ---
@dataclass
class Message:
sender: str
recipient: str | None
body: str
message_id: str | None = None
channel: str | None = None
class ConversationRepository:
def __init__(self) -> None:
self._direct: DefaultDict[tuple[str, str], List[Message]] = defaultdict(list)
def add_direct(self, message: Message) -> None:
assert message.recipient is not None
key = tuple(sorted([message.sender, message.recipient]))
self._direct[key].append(message)
def history(self, user_a: str, user_b: str) -> List[Message]:
key = tuple(sorted([user_a, user_b]))
return list(self._direct[key])
class ChatService:
def __init__(self, repo: ConversationRepository, notifier: Callable[[Message], None]) -> None:
self.repo = repo
self.notifier = notifier
def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
return Message(sender, recipient, body)
def send(self, sender: str, recipient: str, body: str) -> Message:
message = self._create_direct_message(sender, recipient, body)
self.repo.add_direct(message)
self.notifier(message)
return message
# --- Level 2 concurrency extensions ---
class ChannelRepository(ConversationRepository):
def __init__(self) -> None:
super().__init__()
self._channels: DefaultDict[str, List[Message]] = defaultdict(list)
def add_channel(self, channel: str, message: Message) -> None:
self._channels[channel].append(message)
def channel_history(self, channel: str) -> List[Message]:
return list(self._channels[channel])
class ConcurrentChatService(ChatService):
def __init__(self, repo: ChannelRepository, notifier: Callable[[Message], None]) -> None:
super().__init__(repo, notifier)
self.repo = repo
self._ids = (str(value) for value in itertools.count(1))
def _next_id(self) -> str:
return next(self._ids)
def _create_direct_message(self, sender: str, recipient: str, body: str) -> Message:
return Message(sender, recipient, body, message_id=self._next_id())
def send_to_channel(self, sender: str, channel: str, body: str) -> Message:
message = Message(sender, None, body, message_id=self._next_id(), channel=channel)
self.repo.add_channel(channel, message)
self.notifier(message)
return message
class AsyncChannelDispatcher:
def __init__(self, service: ConcurrentChatService) -> None:
self.service = service
self.subscribers: Dict[str, Dict[str, asyncio.Queue[Message]]] = defaultdict(dict)
async def subscribe(self, user: str, channel: str) -> asyncio.Queue[Message]:
queue: asyncio.Queue[Message] = asyncio.Queue()
self.subscribers[channel][user] = queue
return queue
async def publish(self, sender: str, channel: str, body: str) -> Message:
message = self.service.send_to_channel(sender, channel, body)
for queue in self.subscribers[channel].values():
await queue.put(message)
return message
# --- Level 3 resiliency additions ---
@dataclass
class PendingDelivery:
message: Message
user: str
attempts: int = 0
class ReliableChannelDispatcher(AsyncChannelDispatcher):
def __init__(self, service: ConcurrentChatService, max_attempts: int = 3) -> None:
super().__init__(service)
self.max_attempts = max_attempts
self.pending: Dict[str, PendingDelivery] = {}
self.dead_letters: List[PendingDelivery] = []
def _key(self, message_id: str, user: str) -> str:
return f"{message_id}:{user}"
async def publish(self, sender: str, channel: str, body: str) -> Message:
message = await super().publish(sender, channel, body)
for user in self.subscribers[channel]:
if message.message_id is None:
continue
self.pending[self._key(message.message_id, user)] = PendingDelivery(message, user)
return message
def ack(self, message_id: str, user: str) -> None:
self.pending.pop(self._key(message_id, user), None)
async def deliver_pending(self) -> None:
for key, delivery in list(self.pending.items()):
channel = delivery.message.channel
assert channel is not None
queue = self.subscribers[channel][delivery.user]
await queue.put(delivery.message)
delivery.attempts += 1
if delivery.attempts >= self.max_attempts:
self.dead_letters.append(delivery)
del self.pending[key]
async def main_async() -> None:
repo = ChannelRepository()
dispatcher = ReliableChannelDispatcher(ConcurrentChatService(repo, lambda message: None))
incidents_alice = await dispatcher.subscribe("alice", "#incidents")
incidents_bob = await dispatcher.subscribe("bob", "#incidents")
async def alice_consumer() -> None:
msg = await incidents_alice.get()
print("alice acked", msg.body)
if msg.message_id is not None:
dispatcher.ack(msg.message_id, "alice")
async def bob_consumer() -> None:
deliveries = 0
while deliveries < 2:
msg = await incidents_bob.get()
deliveries += 1
print("bob attempt", deliveries, "for", msg.body)
if deliveries == 2 and msg.message_id is not None:
dispatcher.ack(msg.message_id, "bob")
await dispatcher.publish("service-bot", "#incidents", "incident #1234")
await asyncio.gather(alice_consumer(), bob_consumer())
await dispatcher.deliver_pending()
if dispatcher.dead_letters:
print("Dead letters:", [(d.user, d.message.body) for d in dispatcher.dead_letters])
if __name__ == "__main__":
asyncio.run(main_async())
Progressively build the file storage system from a core store to concurrent upload handling and resilient replication layers.
File Storage Stack ├─ Level 1: Core Store ├─ Level 2: Concurrent Uploads └─ Level 3: Resilient Replication
Level 1 — Core Implementation
Create a Dropbox-style service that stores files with version history. Sample Input: upload("notes.txt","v1"), upload("notes.txt","v2"). Sample Output: Two versions stored, latest retrievable.
from __future__ import annotations
from dataclasses import dataclass
from typing import DefaultDict, Dict, List
@dataclass
class FileVersion:
version: int
content: str
class FileRepository:
def __init__(self) -> None:
self.store: DefaultDict[str, List[FileVersion]] = DefaultDict(list)
def add_version(self, path: str, content: str) -> FileVersion:
versions = self.store[path]
version = FileVersion(len(versions) + 1, content)
versions.append(version)
return version
def latest(self, path: str) -> FileVersion:
return self.store[path][-1]
def history(self, path: str) -> List[FileVersion]:
return self.store[path]
def main() -> None:
repo = FileRepository()
repo.add_version("notes.txt", "Version 1")
repo.add_version("notes.txt", "Version 2")
print("Latest:", repo.latest("notes.txt"))
print("History:", repo.history("notes.txt"))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Handle concurrent file updates from multiple devices with optimistic concurrency control. Sample Input: Two threads uploading new versions concurrently. Sample Output: Conflicting updates detected and merged version created.
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Dict, List, Optional
@dataclass
class VersionedDocument:
version: int
content: str
class VersionRepository:
def __init__(self) -> None:
self.versions: Dict[str, List[VersionedDocument]] = {"notes.txt": [VersionedDocument(1, "Base")]}
self.lock = threading.Lock()
def latest(self, path: str) -> VersionedDocument:
return self.versions[path][-1]
def append(self, path: str, content: str) -> VersionedDocument:
with self.lock:
latest = self.latest(path)
new_version = VersionedDocument(latest.version + 1, content)
self.versions[path].append(new_version)
return new_version
class ConflictResolver:
def merge(self, base: str, change_a: str, change_b: str) -> str:
return base + "\n" + change_a + "\n" + change_b
def worker(repo: VersionRepository, resolver: ConflictResolver, change: str) -> None:
latest = repo.latest("notes.txt")
merged = resolver.merge(latest.content, change, "")
repo.append("notes.txt", merged)
def main() -> None:
repo = VersionRepository()
resolver = ConflictResolver()
threads = [
threading.Thread(target=worker, args=(repo, resolver, "DeviceA change")),
threading.Thread(target=worker, args=(repo, resolver, "DeviceB change")),
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
for doc in repo.versions["notes.txt"]:
print(doc)
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Replicate file versions across regions with checksum validation and replay failed replications. Sample Input: Upload file while one replica fails. Sample Output: Fallback replica triggered and replication retried until success.
from __future__ import annotations
import hashlib
import random
import threading
import time
from dataclasses import dataclass
from queue import Queue
from typing import Dict
@dataclass
class FileBlob:
version: int
content: str
checksum: str
class PrimaryStore:
def __init__(self) -> None:
self.files: Dict[str, FileBlob] = {}
def save(self, path: str, content: str) -> FileBlob:
checksum = hashlib.sha256(content.encode()).hexdigest()
version = self.files.get(path, FileBlob(0, "", "")).version + 1
blob = FileBlob(version, content, checksum)
self.files[path] = blob
return blob
class ReplicaStore:
def __init__(self, name: str, failure_rate: float = 0.3) -> None:
self.name = name
self.failure_rate = failure_rate
self.files: Dict[str, FileBlob] = {}
def replicate(self, path: str, blob: FileBlob) -> None:
if random.random() < self.failure_rate:
raise RuntimeError(f"{self.name} unavailable")
self.files[path] = blob
class Replicator:
def __init__(self, replicas: Dict[str, ReplicaStore]) -> None:
self.replicas = replicas
self.queue: "Queue[tuple[str, FileBlob]]" = Queue()
self._start()
def _start(self) -> None:
def loop() -> None:
while True:
path, blob = self.queue.get()
for replica in self.replicas.values():
try:
replica.replicate(path, blob)
print(replica.name, "replicated", path, "v", blob.version)
except Exception as exc:
print("Replication failed:", exc)
time.sleep(0.2)
self.queue.put((path, blob))
self.queue.task_done()
threading.Thread(target=loop, daemon=True).start()
def enqueue(self, path: str, blob: FileBlob) -> None:
self.queue.put((path, blob))
def main() -> None:
random.seed(14)
primary = PrimaryStore()
replicator = Replicator({
"replica-a": ReplicaStore("ReplicaA", 0.5),
"replica-b": ReplicaStore("ReplicaB", 0.1),
})
blob = primary.save("notes.txt", "critical data v1")
replicator.enqueue("notes.txt", blob)
time.sleep(1)
if __name__ == "__main__":
main()
Progressively enhance the logging framework from basic appenders to async logging with resilient fallback pipelines.
Logging Framework Stack\n ├─ Level 1: Core Logger\n ├─ Level 2: Asynchronous Logger\n └─ Level 3: Resilient Logging Pipeline
Level 1 — Core Implementation
Create a logging framework supporting multiple appenders (console, memory) with configurable levels. Sample Input: log.info("hello"). Sample Output: Message emitted to all appenders >= INFO level.
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum, auto
from typing import List
class Level(Enum):
DEBUG = auto()
INFO = auto()
WARN = auto()
ERROR = auto()
@dataclass
class LogRecord:
level: Level
message: str
class Appender:
def append(self, record: LogRecord) -> None:
raise NotImplementedError
class ConsoleAppender(Appender):
def append(self, record: LogRecord) -> None:
print(f"[{record.level.name}] {record.message}")
class MemoryAppender(Appender):
def __init__(self) -> None:
self.records: List[LogRecord] = []
def append(self, record: LogRecord) -> None:
self.records.append(record)
class Logger:
def __init__(self, level: Level, appenders: List[Appender]) -> None:
self.level = level
self.appenders = appenders
def log(self, level: Level, message: str) -> None:
if level.value < self.level.value:
return
record = LogRecord(level, message)
for appender in self.appenders:
appender.append(record)
def info(self, message: str) -> None:
self.log(Level.INFO, message)
def main() -> None:
memory = MemoryAppender()
logger = Logger(Level.DEBUG, [ConsoleAppender(), memory])
logger.info("Hello logging")
print("Memory records:", memory.records)
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Introduce an asynchronous logging dispatcher that buffers messages and writes via worker threads. Sample Input: 5 threads generating logs concurrently. Sample Output: Logs persisted without blocking producers.
import threading
from enum import Enum, auto
from queue import Queue
from typing import Callable
class Level(Enum):
DEBUG = auto()
INFO = auto()
WARN = auto()
ERROR = auto()
class AsyncLogger:
def __init__(self, sink: Callable[[Level, str], None]) -> None:
self.queue: "Queue[tuple[Level, str]]" = Queue()
self.sink = sink
self.stop = threading.Event()
threading.Thread(target=self._worker, daemon=True).start()
def log(self, level: Level, message: str) -> None:
self.queue.put((level, message))
def _worker(self) -> None:
while not self.stop.is_set():
try:
level, message = self.queue.get(timeout=0.2)
except Exception:
continue
self.sink(level, message)
self.queue.task_done()
def shutdown(self) -> None:
self.queue.join()
self.stop.set()
def sink(level: Level, message: str) -> None:
print(f"[{level.name}] {message}")
def worker(logger: AsyncLogger, index: int) -> None:
for i in range(3):
logger.log(Level.INFO, f"Thread {index} message {i}")
def main() -> None:
logger = AsyncLogger(sink)
threads = [threading.Thread(target=worker, args=(logger, i)) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
logger.shutdown()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Ensure logs are not lost when the primary sink is down by applying backpressure, retries, and fallback storage. Sample Input: Flood of logs during sink outage. Sample Output: Logs temporarily buffered or written to fallback, then drained.
import random
import time
from collections import deque
from enum import Enum, auto
class Level(Enum):
INFO = auto()
ERROR = auto()
class PrimarySink:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def write(self, level: Level, message: str) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("primary sink down")
print("Primary:", level.name, message)
class FallbackSink:
def __init__(self) -> None:
self.storage = []
def write(self, level: Level, message: str) -> None:
print("Fallback:", level.name, message)
self.storage.append((level, message))
class ResilientLogger:
def __init__(self, primary: PrimarySink, fallback: FallbackSink) -> None:
self.primary = primary
self.fallback = fallback
self.buffer: deque[tuple[Level, str]] = deque(maxlen=50)
def log(self, level: Level, message: str) -> None:
self.buffer.append((level, message))
self._drain()
def _drain(self) -> None:
pending = len(self.buffer)
for _ in range(pending):
level, message = self.buffer.popleft()
try:
self.primary.write(level, message)
except Exception as exc:
print("Primary failed:", exc)
self.fallback.write(level, message)
def main() -> None:
random.seed(15)
logger = ResilientLogger(PrimarySink(0.5), FallbackSink())
for i in range(10):
logger.log(Level.INFO, f"event {i}")
time.sleep(0.05)
if __name__ == "__main__":
main()
Build out the event bus from simple pub/sub to concurrent handlers and resilient delivery with retries.
Event Bus Stack\n ├─ Level 1: Basic Publisher/Subscriber\n ├─ Level 2: Concurrent Event Processing\n └─ Level 3: Resilient Delivery Service
Level 1 — Core Implementation
Implement an in-memory pub/sub bus that allows publishers to emit events to multiple subscribers. Sample Input: publish("order.created"). Sample Output: Registered subscribers invoked with payload.
from collections import defaultdict
from typing import Callable, DefaultDict, List
class EventBus:
def __init__(self) -> None:
self.subscribers: DefaultDict[str, List[Callable[[dict], None]]] = defaultdict(list)
def subscribe(self, topic: str, handler: Callable[[dict], None]) -> None:
self.subscribers[topic].append(handler)
def publish(self, topic: str, payload: dict) -> None:
for handler in self.subscribers[topic]:
handler(payload)
def main() -> None:
bus = EventBus()
bus.subscribe("order.created", lambda payload: print("Analytics saw", payload))
bus.subscribe("order.created", lambda payload: print("Billing saw", payload))
bus.publish("order.created", {"order_id": "O1", "amount": 100})
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Extend the bus to persist messages per topic with acknowledgements. Sample Input: Subscriber processes 5 events, acking each. Sample Output: Messages redelivered if ack missing.
from collections import defaultdict, deque
from typing import Callable, Deque, Dict, Tuple
class DurableBus:
def __init__(self) -> None:
self.queues: Dict[str, Deque[Tuple[int, dict]]] = defaultdict(deque)
self.handlers: Dict[str, Callable[[int, dict], bool]] = {}
self.offsets: Dict[str, int] = defaultdict(int)
def subscribe(self, topic: str, handler: Callable[[int, dict], bool]) -> None:
self.handlers[topic] = handler
def publish(self, topic: str, payload: dict) -> None:
offset = self.offsets[topic]
self.offsets[topic] += 1
self.queues[topic].append((offset, payload))
self._dispatch(topic)
def _dispatch(self, topic: str) -> None:
handler = self.handlers.get(topic)
if not handler:
return
queue = self.queues[topic]
pending = len(queue)
for _ in range(pending):
offset, payload = queue[0]
ack = handler(offset, payload)
if ack:
queue.popleft()
else:
break
def main() -> None:
bus = DurableBus()
def handler(offset: int, payload: dict) -> bool:
print("Processing", offset, payload)
return offset % 2 == 0
bus.subscribe("metrics", handler)
for idx in range(5):
bus.publish("metrics", {"value": idx})
print("Remaining queue:", list(bus.queues["metrics"]))
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Add retry semantics with exponential backoff and dead-letter queues for poison messages. Sample Input: Events with handler failures. Sample Output: Retried events eventually success or rerouted to dead-letter topic.
import random
import time
from collections import defaultdict, deque
from typing import Callable, Deque, Dict, Tuple
class ResilientBroker:
def __init__(self) -> None:
self.handlers: Dict[str, Callable[[dict], None]] = {}
self.queues: Dict[str, Deque[Tuple[int, dict, int]]] = defaultdict(deque)
self.dead_letters: Dict[str, list] = defaultdict(list)
def subscribe(self, topic: str, handler: Callable[[dict], None]) -> None:
self.handlers[topic] = handler
def publish(self, topic: str, payload: dict) -> None:
queue = self.queues[topic]
queue.append((len(queue), payload, 0))
self._drain(topic)
def _drain(self, topic: str) -> None:
queue = self.queues[topic]
handler = self.handlers.get(topic)
pending = len(queue)
for _ in range(pending):
offset, payload, attempt = queue.popleft()
try:
handler(payload)
except Exception as exc:
attempt += 1
if attempt >= 3:
print("Dead-lettering", payload)
self.dead_letters[topic].append((payload, str(exc)))
else:
backoff = 0.1 * (2 ** attempt)
print("Retrying", payload, "in", backoff)
time.sleep(backoff)
queue.append((offset, payload, attempt))
def main() -> None:
random.seed(16)
broker = ResilientBroker()
def handler(payload: dict) -> None:
if random.random() < 0.5:
raise RuntimeError("handler failure")
print("Handled", payload)
broker.subscribe("notifications", handler)
for idx in range(4):
broker.publish("notifications", {"id": idx})
print("Dead letters:", broker.dead_letters["notifications"])
if __name__ == "__main__":
main()
Grow the workflow orchestrator from basic sequencing to concurrent coordination and resilient recovery.
Workflow Orchestrator Stack\n ├─ Level 1: Core Orchestrator\n ├─ Level 2: Concurrent Workflow Engine\n └─ Level 3: Resilient Workflow Service
Level 1 — Core Implementation
Execute tasks defined as a DAG, ensuring dependencies are respected. Sample Input: DAG A->B, A->C, B->D. Sample Output: Tasks executed in valid order.
from collections import defaultdict, deque
from typing import Callable, Dict, List, Set
class Workflow:
def __init__(self) -> None:
self.graph: Dict[str, Set[str]] = defaultdict(set)
self.reverse: Dict[str, Set[str]] = defaultdict(set)
self.tasks: Dict[str, Callable[[], None]] = {}
def add_task(self, name: str, fn: Callable[[], None]) -> None:
self.tasks[name] = fn
def add_edge(self, prereq: str, task: str) -> None:
self.graph[prereq].add(task)
self.reverse[task].add(prereq)
def run(self) -> None:
indegree: Dict[str, int] = {task: len(self.reverse[task]) for task in self.tasks}
ready = deque([task for task, deg in indegree.items() if deg == 0])
while ready:
task = ready.popleft()
self.tasks[task]()
for neighbor in self.graph[task]:
indegree[neighbor] -= 1
if indegree[neighbor] == 0:
ready.append(neighbor)
def main() -> None:
wf = Workflow()
wf.add_task("A", lambda: print("A"))
wf.add_task("B", lambda: print("B"))
wf.add_task("C", lambda: print("C"))
wf.add_task("D", lambda: print("D"))
wf.add_edge("A", "B")
wf.add_edge("A", "C")
wf.add_edge("B", "D")
wf.run()
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Execute workflows using a thread pool, running independent branches concurrently. Sample Input: DAG with parallel branches. Sample Output: Tasks executed with concurrency, respecting dependencies.
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
from typing import Callable, Dict, Set
class ConcurrentWorkflow:
def __init__(self) -> None:
self.dependencies: Dict[str, Set[str]] = defaultdict(set)
self.dependents: Dict[str, Set[str]] = defaultdict(set)
self.tasks: Dict[str, Callable[[], None]] = {}
def add_task(self, name: str, fn: Callable[[], None]) -> None:
self.tasks[name] = fn
def add_edge(self, prereq: str, task: str) -> None:
self.dependencies[task].add(prereq)
self.dependents[prereq].add(task)
def run(self) -> None:
completed: Set[str] = set()
executor = ThreadPoolExecutor(max_workers=4)
def schedule(task: str) -> None:
executor.submit(execute_task, task)
def execute_task(task: str) -> None:
self.tasks[task]()
completed.add(task)
for dependent in self.dependents[task]:
if self.dependencies[dependent].issubset(completed):
schedule(dependent)
for task in self.tasks:
if not self.dependencies[task]:
schedule(task)
executor.shutdown(wait=True)
def main() -> None:
wf = ConcurrentWorkflow()
wf.add_task("A", lambda: print("A"))
wf.add_task("B", lambda: print("B"))
wf.add_task("C", lambda: print("C"))
wf.add_task("D", lambda: print("D"))
wf.add_edge("A", "B")
wf.add_edge("A", "C")
wf.add_edge("B", "D")
wf.add_edge("C", "D")
wf.run()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Persist workflow state, retry failed steps with compensation hooks, and resume from last checkpoint after crash. Sample Input: Workflow with flaky step. Sample Output: Step retried, compensated on failure, workflow state persisted.
import json
import os
import random
import time
from typing import Callable, Dict
class StateStore:
def __init__(self, path: str) -> None:
self.path = path
if not os.path.exists(path):
self.save({})
def load(self) -> Dict[str, str]:
with open(self.path, "r", encoding="utf-8") as handle:
return json.load(handle)
def save(self, state: Dict[str, str]) -> None:
with open(self.path, "w", encoding="utf-8") as handle:
json.dump(state, handle)
class SagaStep:
def __init__(self, execute: Callable[[], None], compensate: Callable[[], None]) -> None:
self.execute = execute
self.compensate = compensate
class ResilientSaga:
def __init__(self, steps: Dict[str, SagaStep], store: StateStore) -> None:
self.steps = steps
self.store = store
self.state = store.load()
def run(self) -> None:
for name, step in self.steps.items():
status = self.state.get(name)
if status == "done":
continue
attempt = 0
while attempt < 3:
try:
step.execute()
self.state[name] = "done"
self.store.save(self.state)
break
except Exception as exc:
attempt += 1
if attempt >= 3:
self.state[name] = "failed"
self.store.save(self.state)
step.compensate()
raise
time.sleep(0.2 * attempt)
def main() -> None:
random.seed(17)
def flaky_step():
if random.random() < 0.5:
raise RuntimeError("flaky")
print("Executed flaky step")
steps = {
"reserve": SagaStep(lambda: print("Reserved inventory"), lambda: print("Released inventory")),
"pay": SagaStep(flaky_step, lambda: print("Refunded payment")),
"notify": SagaStep(lambda: print("Notification sent"), lambda: print("Notification compensating")),
}
saga = ResilientSaga(steps, StateStore("/tmp/workflow-state.json"))
try:
saga.run()
except Exception as exc:
print("Saga ended with failure:", exc)
if __name__ == "__main__":
main()
Progressively enhance the feed aggregation system from simple RSS pulling to concurrent pipelines and resilient aggregation with fallbacks.
Feed Aggregator Stack ├─ Level 1: RSS Puller ├─ Level 2: Concurrent Fetch Pipeline └─ Level 3: Resilient Aggregation Service
Level 1 — Core Implementation
Aggregate posts from multiple users into a single timeline sorted by time. Sample Input: posts from followed users. Sample Output: Combined feed in descending order.
import heapq
from typing import Dict, List, Optional, Tuple
class FeedService:
def __init__(self, follows: Dict[str, List[str]], posts: Dict[str, List[Tuple[int, str]]]) -> None:
self.follows = follows
self.posts = posts
def feed(self, user: str) -> List[Tuple[int, str]]:
heap = []
for followee in self.follows.get(user, []):
for timestamp, content in self.posts.get(followee, []):
heapq.heappush(heap, (-timestamp, content))
return [(-ts, content) for ts, content in heapq.nsmallest(len(heap), heap)]
def main() -> None:
follows = {"alice": ["bob", "carol"]}
posts = {
"bob": [(3, "bob post 1"), (1, "bob post 0")],
"carol": [(2, "carol post")],
}
service = FeedService(follows, posts)
print("Feed:", service.feed("alice"))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Maintain cached timelines using fan-out-on-write strategy with background refresh. Sample Input: Users posting new content. Sample Output: Followers' caches updated asynchronously.
import threading
import time
from collections import defaultdict, deque
from typing import Deque, Dict, List, Tuple
class FanoutFeed:
def __init__(self, follows: Dict[str, List[str]]) -> None:
self.follows = follows
self.cache: Dict[str, Deque[Tuple[int, str]]] = defaultdict(deque)
self.lock = threading.Lock()
def publish(self, author: str, content: str, timestamp: int) -> None:
followers = [user for user, followees in self.follows.items() if author in followees]
for follower in followers:
with self.lock:
self.cache[follower].appendleft((timestamp, content))
while len(self.cache[follower]) > 10:
self.cache[follower].pop()
def timeline(self, user: str) -> List[Tuple[int, str]]:
with self.lock:
return list(self.cache[user])
def main() -> None:
follows = {"alice": ["bob"], "eve": ["bob", "alice"]}
feed = FanoutFeed(follows)
feed.publish("bob", "hello world", 3)
feed.publish("alice", "hi there", 4)
print("Alice timeline:", feed.timeline("alice"))
print("Eve timeline:", feed.timeline("eve"))
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Ensure feed requests succeed even when the primary store fails by layering cache, fallback to stale data, and eventual refresh. Sample Input: Feed requests during store outage. Sample Output: Stale feed served with log of degradation.
import random
import time
from typing import Dict, List, Optional, Tuple
class PrimaryFeedStore:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
self.feeds: Dict[str, List[Tuple[int, str]]] = {}
def get(self, user: str) -> List[Tuple[int, str]]:
if random.random() < self.failure_rate:
raise RuntimeError("feed store down")
return self.feeds.get(user, [])
class ResilientFeedService:
def __init__(self, primary: PrimaryFeedStore) -> None:
self.primary = primary
self.cache: Dict[str, List[Tuple[int, str]]] = {}
self.stale: Dict[str, List[Tuple[int, str]]] = {}
def feed(self, user: str) -> List[Tuple[int, str]]:
try:
feed = self.primary.get(user)
self.cache[user] = feed
self.stale[user] = feed
return feed
except Exception as exc:
print("Primary failed, serving stale feed:", exc)
return self.cache.get(user) or self.stale.get(user, [])
def main() -> None:
random.seed(19)
store = PrimaryFeedStore(0.6)
store.feeds["alice"] = [(5, "new post"), (3, "older post")]
service = ResilientFeedService(store)
for _ in range(4):
print("Feed response:", service.feed("alice"))
time.sleep(0.1)
if __name__ == "__main__":
main()
Improve the metrics collector from a simple in-memory store to concurrent shards and a resilient pipeline with buffering/fallback.
Metrics Collector Stack ├─ Level 1: In-Memory Collector ├─ Level 2: Concurrent Sharded Collector └─ Level 3: Resilient Metrics Pipeline
Level 1 — Core Implementation
Implement a counter service that increments and reads named metrics. Sample Input: inc("requests", 3). Sample Output: Counter returns aggregated value.
from collections import defaultdict
from typing import Dict
class CounterService:
def __init__(self) -> None:
self.counters: Dict[str, int] = defaultdict(int)
def increment(self, name: str, value: int = 1) -> None:
self.counters[name] += value
def get(self, name: str) -> int:
return self.counters[name]
def main() -> None:
counters = CounterService()
counters.increment("requests")
counters.increment("requests", 2)
print("Requests:", counters.get("requests"))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Compute rolling window metrics for high-volume counters using thread-safe structures. Sample Input: Concurrent increments with timestamps. Sample Output: Rolling sum over last N seconds computed correctly.
from __future__ import annotations
import threading
import time
from collections import deque
from typing import Deque, Tuple
class RollingCounter:
def __init__(self, window_seconds: int) -> None:
self.window = window_seconds
self.events: Deque[Tuple[float, int]] = deque()
self.lock = threading.Lock()
def increment(self, value: int = 1) -> None:
now = time.time()
with self.lock:
self.events.append((now, value))
self._trim(now)
def total(self) -> int:
now = time.time()
with self.lock:
self._trim(now)
return sum(value for _, value in self.events)
def _trim(self, now: float) -> None:
while self.events and now - self.events[0][0] > self.window:
self.events.popleft()
def worker(counter: RollingCounter) -> None:
for _ in range(5):
counter.increment()
time.sleep(0.05)
def main() -> None:
counter = RollingCounter(1)
threads = [threading.Thread(target=worker, args=(counter,)) for _ in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Rolling total:", counter.total())
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Aggregate metrics from edge nodes and reliably ship them to a central store with batching, retries, and fallback to disk. Sample Input: Metric batches with simulated failures. Sample Output: Successful shipments logged, failures retried or persisted locally.
import json
import os
import random
import time
from typing import Dict, List, Optional
class CentralTransport:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def ship(self, batch: List[Dict[str, int]]) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("transport failure")
print("Shipped batch:", batch)
class FallbackDisk:
def __init__(self, path: str) -> None:
self.path = path
def persist(self, batch: List[Dict[str, int]]) -> None:
with open(self.path, "a", encoding="utf-8") as handle:
handle.write(json.dumps(batch) + "\n")
class ResilientAggregator:
def __init__(self, transport: CentralTransport, fallback: FallbackDisk, batch_size: int = 3) -> None:
self.transport = transport
self.fallback = fallback
self.batch_size = batch_size
self.buffer: List[Dict[str, int]] = []
def record(self, metrics: Dict[str, int]) -> None:
self.buffer.append(metrics)
if len(self.buffer) >= self.batch_size:
self._flush()
def _flush(self) -> None:
batch = self.buffer[: self.batch_size]
try:
self.transport.ship(batch)
except Exception as exc:
print("Ship failed:", exc, "persisting locally")
self.fallback.persist(batch)
finally:
self.buffer = self.buffer[self.batch_size :]
def main() -> None:
random.seed(20)
aggregator = ResilientAggregator(CentralTransport(0.5), FallbackDisk("/tmp/metrics.log"))
for idx in range(7):
aggregator.record({"requests": idx})
aggregator._flush()
if __name__ == "__main__":
main()
Build the document collaboration platform from basic storage to concurrent sessions and resilient synchronization.
Doc Collaboration Stack ├─ Level 1: Core Document Store ├─ Level 2: Concurrent Editing Sessions └─ Level 3: Resilient Collaboration Service
Level 1 — Core Implementation
Implement a collaborative document that applies insert/delete operations sequentially. Sample Input: apply insert(0,"Hi"), delete(1). Sample Output: Final text matches operations replay.
from dataclasses import dataclass
from typing import List
@dataclass
class Operation:
kind: str
index: int
payload: str = ""
class Document:
def __init__(self) -> None:
self.text = ""
self.log: List[Operation] = []
def insert(self, index: int, payload: str) -> None:
self.text = self.text[:index] + payload + self.text[index:]
self.log.append(Operation("insert", index, payload))
def delete(self, index: int) -> None:
self.text = self.text[:index] + self.text[index + 1 :]
self.log.append(Operation("delete", index))
def replay(self) -> str:
text = ""
for op in self.log:
if op.kind == "insert":
text = text[:op.index] + op.payload + text[op.index:]
elif op.kind == "delete":
text = text[:op.index] + text[op.index + 1 :]
return text
def main() -> None:
doc = Document()
doc.insert(0, "H")
doc.insert(1, "i")
doc.insert(2, "!")
doc.delete(2)
print("Text:", doc.text)
print("Replay:", doc.replay())
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Support concurrent edits from multiple sites using a simple observed-remove CRDT for characters. Sample Input: Two threads editing same doc. Sample Output: Merged document contains both edits deterministically.
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Dict, Tuple
@dataclass
class CharacterVersion:
char: str
clock: Tuple[str, int]
tombstone: bool = False
class CRDTDocument:
def __init__(self) -> None:
self.characters: Dict[int, CharacterVersion] = {}
self.lock = threading.Lock()
def apply(self, position: int, char: str, clock: Tuple[str, int]) -> None:
with self.lock:
existing = self.characters.get(position)
if not existing or clock > existing.clock:
self.characters[position] = CharacterVersion(char, clock)
def delete(self, position: int, clock: Tuple[str, int]) -> None:
with self.lock:
existing = self.characters.get(position)
if not existing or clock >= existing.clock:
self.characters[position] = CharacterVersion("", clock, tombstone=True)
def materialize(self) -> str:
with self.lock:
return "".join(
version.char for pos, version in sorted(self.characters.items()) if not version.tombstone
)
def worker(doc: CRDTDocument, author: str, edits: Tuple[int, str]) -> None:
pos, char = edits
doc.apply(pos, char, (author, pos))
def main() -> None:
doc = CRDTDocument()
threads = [
threading.Thread(target=worker, args=(doc, "siteA", (0, "H"))),
threading.Thread(target=worker, args=(doc, "siteB", (1, "i"))),
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Merged doc:", doc.materialize())
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Keep collaboration working offline by queueing local operations and replaying them to the server with retries. Sample Input: Offline edits replayed once connection restored. Sample Output: Operations applied in order and acknowledgements logged.
import random
import time
from dataclasses import dataclass
from queue import Queue, Empty
@dataclass
class EditOp:
op_id: str
position: int
payload: str
class RemoteStore:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
self.buffer = ""
def apply(self, op: EditOp) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("network error")
self.buffer = self.buffer[: op.position] + op.payload + self.buffer[op.position :]
print("Remote applied", op.op_id, "->", self.buffer)
class OfflineClient:
def __init__(self, store: RemoteStore) -> None:
self.store = store
self.queue: "Queue[EditOp]" = Queue()
def edit(self, op: EditOp) -> None:
print("Queued", op)
self.queue.put(op)
def sync(self) -> None:
pending = []
while True:
try:
op = self.queue.get_nowait()
except Empty:
break
try:
self.store.apply(op)
except Exception as exc:
print("Sync failed for", op.op_id, exc)
pending.append(op)
finally:
self.queue.task_done()
for op in pending:
self.queue.put(op)
def main() -> None:
random.seed(21)
store = RemoteStore(0.5)
client = OfflineClient(store)
client.edit(EditOp("op1", 0, "Hi"))
client.edit(EditOp("op2", 2, "!"))
for _ in range(5):
client.sync()
time.sleep(0.1)
if __name__ == "__main__":
main()
Progressively enhance the rule engine from core evaluation to concurrent processing and a resilient rules service.
Rule Engine Stack ├─ Level 1: Core Rule Evaluator ├─ Level 2: Concurrent Rule Processing └─ Level 3: Resilient Rules Service
Level 1 — Core Implementation
Build a rule engine that loads condition-action pairs and evaluates them against an input. Sample Input: rules checking order amount. Sample Output: Matching rules execute actions.
from dataclasses import dataclass
from typing import Callable, Dict, List
@dataclass
class Rule:
name: str
condition: Callable[[Dict], bool]
action: Callable[[Dict], None]
class RuleEngine:
def __init__(self) -> None:
self.rules: List[Rule] = []
def add_rule(self, rule: Rule) -> None:
self.rules.append(rule)
def evaluate(self, context: Dict) -> None:
for rule in self.rules:
if rule.condition(context):
rule.action(context)
def main() -> None:
engine = RuleEngine()
engine.add_rule(Rule("high-value", lambda ctx: ctx["amount"] > 100, lambda ctx: print("Flag high value")))
engine.add_rule(Rule("vip", lambda ctx: ctx["customer"] == "vip", lambda ctx: print("Apply vip perks")))
engine.evaluate({"amount": 150, "customer": "vip"})
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Allow rules to be reloaded at runtime without interrupting evaluation, using read/write locks for consistency. Sample Input: Reload rule set while evaluations run. Sample Output: Evaluations succeed with either old or new rule set safely.
from __future__ import annotations
import threading
from dataclasses import dataclass
from typing import Callable, Dict, List
@dataclass
class Rule:
name: str
condition: Callable[[Dict], bool]
action: Callable[[Dict], None]
class HotReloadEngine:
def __init__(self) -> None:
self.rules: List[Rule] = []
self.lock = threading.RLock()
def evaluate(self, context: Dict) -> None:
with self.lock:
for rule in self.rules:
if rule.condition(context):
rule.action(context)
def reload(self, rules: List[Rule]) -> None:
with self.lock:
self.rules = rules
def evaluator(engine: HotReloadEngine, context: Dict) -> None:
engine.evaluate(context)
def main() -> None:
engine = HotReloadEngine()
engine.reload([Rule("gt100", lambda ctx: ctx["amount"] > 100, lambda ctx: print("Large order"))])
threads = [
threading.Thread(target=evaluator, args=(engine, {"amount": 150})),
threading.Thread(target=evaluator, args=(engine, {"amount": 80})),
]
for thread in threads:
thread.start()
engine.reload([Rule("gt50", lambda ctx: ctx["amount"] > 50, lambda ctx: print("Order > 50"))])
for thread in threads:
thread.join()
engine.evaluate({"amount": 60})
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Fetch rules from a remote service with failover to cached rules and eventual refresh. Sample Input: Remote fetch fails intermittently. Sample Output: Engine serves cached rules and refreshes when remote recovers.
import random
import time
from dataclasses import dataclass
from typing import Callable, Dict, List
@dataclass
class Rule:
name: str
condition: Callable[[Dict], bool]
action: Callable[[Dict], None]
class RemoteRuleSource:
def __init__(self, failure_rate: float = 0.5) -> None:
self.failure_rate = failure_rate
def fetch(self) -> List[Rule]:
if random.random() < self.failure_rate:
raise RuntimeError("rule source down")
return [
Rule("gt200", lambda ctx: ctx["amount"] > 200, lambda ctx: print("Audit order")),
]
class ResilientRuleEngine:
def __init__(self, source: RemoteRuleSource) -> None:
self.source = source
self.cache: List[Rule] = []
def refresh(self) -> None:
try:
rules = self.source.fetch()
self.cache = rules
print("Rule cache refreshed")
except Exception as exc:
print("Rule refresh failed:", exc)
def evaluate(self, context: Dict) -> None:
for rule in self.cache:
if rule.condition(context):
rule.action(context)
def main() -> None:
random.seed(22)
engine = ResilientRuleEngine(RemoteRuleSource(0.6))
for _ in range(4):
engine.refresh()
engine.evaluate({"amount": 250})
time.sleep(0.2)
if __name__ == "__main__":
main()
Grow the leaderboard functionality from single-threaded ranking to concurrent updates and resilient aggregation with caches.
Leaderboard Stack ├─ Level 1: Core Leaderboard ├─ Level 2: Concurrent Leaderboard Updates └─ Level 3: Resilient Leaderboard Service
Level 1 — Core Implementation
Maintain a leaderboard with add/update score and retrieving top K players. Sample Input: update("alice",100), top(2). Sample Output: Ordered list of top players.
import heapq
from typing import Dict, List, Optional, Tuple
class Leaderboard:
def __init__(self) -> None:
self.scores: Dict[str, int] = {}
def update(self, player: str, score: int) -> None:
self.scores[player] = score
def top(self, k: int) -> List[Tuple[int, str]]:
return heapq.nlargest(k, [(score, player) for player, score in self.scores.items()])
def main() -> None:
lb = Leaderboard()
lb.update("alice", 100)
lb.update("bob", 90)
lb.update("carol", 120)
print("Top2:", lb.top(2))
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Support high-frequency score updates with sharded locks and periodic decay to keep leaderboard fresh. Sample Input: 5 threads updating scores. Sample Output: Final ranking consistent.
from __future__ import annotations
import threading
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
class ShardedLeaderboard:
def __init__(self, shards: int = 4) -> None:
self.shards = shards
self.segment_scores: Dict[int, Dict[str, int]] = defaultdict(dict)
self.locks: Dict[int, threading.Lock] = defaultdict(threading.Lock)
def _shard(self, player: str) -> int:
return hash(player) % self.shards
def update(self, player: str, delta: int) -> None:
shard = self._shard(player)
with self.locks[shard]:
self.segment_scores[shard][player] = self.segment_scores[shard].get(player, 0) + delta
def snapshot(self) -> Dict[str, int]:
combined: Dict[str, int] = {}
for shard, scores in self.segment_scores.items():
with self.locks[shard]:
for player, score in scores.items():
combined[player] = combined.get(player, 0) + score
return combined
def worker(lb: ShardedLeaderboard, player: str) -> None:
for _ in range(10):
lb.update(player, 1)
def main() -> None:
lb = ShardedLeaderboard()
threads = [threading.Thread(target=worker, args=(lb, f"player{i}")) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("Snapshot:", lb.snapshot())
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Expose a leaderboard backed by a distributed cache and durable store with fallback when the cache is unavailable. Sample Input: Cache misses due to outage. Sample Output: Data served from store and cache repopulated when back.
import random
from typing import Dict, List, Optional, Tuple
class ShardedLeaderboard:
def __init__(self, shards: int = 4) -> None:
self.shards = shards
self.segment_scores: Dict[int, Dict[str, int]] = defaultdict(dict)
self.locks: Dict[int, threading.Lock] = defaultdict(threading.Lock)
def _shard(self, player: str) -> int:
return hash(player) % self.shards
def update(self, player: str, delta: int) -> None:
shard = self._shard(player)
with self.locks[shard]:
self.segment_scores[shard][player] = self.segment_scores[shard].get(player, 0) + delta
def snapshot(self) -> Dict[str, int]:
combined: Dict[str, int] = {}
for shard, scores in self.segment_scores.items():
with self.locks[shard]:
for player, score in scores.items():
combined[player] = combined.get(player, 0) + score
return combined
class CacheLayer:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
self.cache: Dict[str, List[Tuple[int, str]]] = {}
def get(self, key: str) -> List[Tuple[int, str]]:
if random.random() < self.failure_rate:
raise RuntimeError("cache down")
return self.cache.get(key, [])
def set(self, key: str, value: List[Tuple[int, str]]) -> None:
self.cache[key] = value
class ResilientLeaderboard:
def __init__(self, store: ShardedLeaderboard, cache: CacheLayer) -> None:
self.store = store
self.cache = cache
def update(self, player: str, score: int) -> None:
self.store.update(player, score)
def top(self, k: int) -> List[Tuple[int, str]]:
key = f"top:{k}"
try:
result = self.cache.get(key)
if result:
return result
except Exception as exc:
print("Cache miss due to", exc)
scores = self.store.snapshot()
result = heapq.nlargest(k, [(score, player) for player, score in scores.items()])
try:
self.cache.set(key, result)
except Exception as exc:
print("Cache set failed:", exc)
return result
def main() -> None:
random.seed(23)
store = ShardedLeaderboard()
cache = CacheLayer(0.5)
lb = ResilientLeaderboard(store, cache)
lb.update("alice", 120)
lb.update("bob", 110)
lb.update("carol", 140)
for _ in range(3):
print("Top:", lb.top(2))
if __name__ == "__main__":
main()
Progressively build the matchmaking system from simple queues to concurrent matchmakers and resilient session allocation.
Matchmaking Stack ├─ Level 1: Core Queue ├─ Level 2: Concurrent Matchmaker └─ Level 3: Resilient Match Service
Level 1 — Core Implementation
Match players into games using a FIFO queue grouped by skill rating. Sample Input: enqueue players with different skill buckets. Sample Output: Matches created from same bucket.
from collections import defaultdict, deque
from typing import Deque, Dict, List, Tuple
class Matchmaker:
def __init__(self) -> None:
self.buckets: Dict[int, Deque[str]] = defaultdict(deque)
def enqueue(self, player: str, skill: int) -> None:
bucket = skill // 100
self.buckets[bucket].append(player)
def match(self) -> List[Tuple[str, str]]:
matches = []
for queue in self.buckets.values():
while len(queue) >= 2:
matches.append((queue.popleft(), queue.popleft()))
return matches
def main() -> None:
mm = Matchmaker()
mm.enqueue("alice", 120)
mm.enqueue("bob", 130)
mm.enqueue("carol", 205)
mm.enqueue("dave", 215)
print("Matches:", mm.match())
if __name__ == "__main__":
main()
Level 2 — Concurrent Enhancements
Allow concurrent player arrivals while a background matcher continuously forms games. Sample Input: Threads enqueue players. Sample Output: Matches printed by background thread without data races.
from __future__ import annotations
import threading
import time
from collections import defaultdict, deque
from typing import Deque, Dict
class ConcurrentMatchmaker:
def __init__(self) -> None:
self.buckets: Dict[int, Deque[str]] = defaultdict(deque)
self.locks: Dict[int, threading.Lock] = defaultdict(threading.Lock)
self.stop = False
threading.Thread(target=self._loop, daemon=True).start()
def enqueue(self, player: str, skill: int) -> None:
bucket = skill // 100
lock = self.locks[bucket]
with lock:
self.buckets[bucket].append(player)
def _loop(self) -> None:
while not self.stop:
for bucket, queue in list(self.buckets.items()):
lock = self.locks[bucket]
with lock:
while len(queue) >= 2:
p1 = queue.popleft()
p2 = queue.popleft()
print("Match:", p1, p2)
time.sleep(0.1)
def shutdown(self) -> None:
self.stop = True
def worker(mm: ConcurrentMatchmaker, player: str, skill: int) -> None:
mm.enqueue(player, skill)
def main() -> None:
mm = ConcurrentMatchmaker()
threads = [threading.Thread(target=worker, args=(mm, f"player{i}", 100 + i * 10)) for i in range(6)]
for t in threads:
t.start()
for t in threads:
t.join()
time.sleep(0.5)
mm.shutdown()
if __name__ == "__main__":
main()
Level 3 — Resilient Architecture
Keep matchmaking running when the game session allocator fails by using retries, backoff, and fallback queues. Sample Input: Create matches with flaky allocator. Sample Output: Successful allocations logged; failures retried or queued.
import random
import time
from collections import deque
from typing import Deque, Tuple
class SessionAllocator:
def __init__(self, failure_rate: float = 0.4) -> None:
self.failure_rate = failure_rate
def allocate(self, players: Tuple[str, str]) -> None:
if random.random() < self.failure_rate:
raise RuntimeError("allocation failed")
print("Session allocated for", players)
class ResilientMatchmaker:
def __init__(self, allocator: SessionAllocator) -> None:
self.allocator = allocator
self.pending: Deque[Tuple[str, str]] = deque()
def submit_match(self, players: Tuple[str, str]) -> None:
self.pending.append(players)
self._process()
def _process(self) -> None:
retries = []
while self.pending:
players = self.pending.popleft()
try:
self.allocator.allocate(players)
except Exception as exc:
print("Allocation failed:", exc)
retries.append(players)
for players in retries:
self.pending.append(players)
time.sleep(0.1)
def main() -> None:
random.seed(24)
matchmaker = ResilientMatchmaker(SessionAllocator(0.5))
matches = [("alice", "bob"), ("carol", "dave"), ("eve", "frank")]
for match in matches:
matchmaker.submit_match(match)
matchmaker._process()
if __name__ == "__main__":
main()