CS 4459: Distributed Order Processing¶
Due Date
March 25, 11:55 PM
In Assignment 2, you built a centralized lock server to coordinate access to a shared booking database. It worked — but the lock server was a single point of failure. If it crashes, every booking agent is stuck.
In this assignment, you will eliminate the central coordinator entirely. Multiple order-processing microservices will coordinate access to a shared inventory database peer-to-peer, using a message-based distributed mutual exclusion protocol.
No single service is in charge. Every service makes its own decisions.
The Use Case¶
An e-commerce platform processes orders through N independent microservices. All services update a shared inventory database, but the legacy database can only handle one write transaction at a time to prevent data corruption.
Without coordination:
- Two services see 1 unit of an item in stock
- Both try to sell it
- One customer gets an order that can never be fulfilled — an oversell
In Assignment 2, a central lock server solved this. But now imagine that server goes down during Black Friday. Every order stalls. Your job is to build a system where services coordinate among themselves — no single point of failure.
Learning Objectives¶
- Peer-to-peer mutual exclusion — no central coordinator
- Lamport logical clocks — ordering events across distributed nodes without synchronized clocks
- Message-based coordination — REQUEST/REPLY protocol with deferred replies
- Optimal message complexity — exactly
2(N-1)messages per critical section entry - Containerization — package services with Docker
- Idempotent operations — safely handle duplicate requests in unreliable networks
System Architecture¶
The system consists of three types of components running as Docker containers on a shared network. There is no central coordinator.
graph TB
subgraph Workers["Order Processors (Peer-to-Peer)"]
W1[Worker 1<br/>FastAPI :5001]
W2[Worker 2<br/>FastAPI :5002]
W3[Worker 3<br/>FastAPI :5003]
WN[Worker N<br/>FastAPI :500N]
end
subgraph Data["Data Layer"]
IS[Inventory Service<br/>FastAPI :8000]
DB[(PostgreSQL<br/>:5432)]
end
W1 <-.->|"REQUEST / REPLY"| W2
W1 <-.->|"REQUEST / REPLY"| W3
W2 <-.->|"REQUEST / REPLY"| W3
W2 <-.->|"REQUEST / REPLY"| WN
W3 <-.->|"REQUEST / REPLY"| WN
W1 <-.->|"REQUEST / REPLY"| WN
W1 ==>|"process order"| IS
W2 ==>|"process order"| IS
W3 ==>|"process order"| IS
WN ==>|"process order"| IS
IS -->|SQL| DB
style IS fill:#F39C12,stroke:#D68910,stroke-width:3px,color:#fff
style DB fill:#50C878,stroke:#2E7D4E,stroke-width:3px,color:#fff
style W1 fill:#9B59B6,stroke:#6C3483,stroke-width:2px,color:#fff
style W2 fill:#9B59B6,stroke:#6C3483,stroke-width:2px,color:#fff
style W3 fill:#9B59B6,stroke:#6C3483,stroke-width:2px,color:#fff
style WN fill:#9B59B6,stroke:#6C3483,stroke-width:2px,color:#fff
No Lock Server
Compare this diagram with Assignment 2. The Lock Server box is gone. Workers coordinate by sending REQUEST and REPLY messages directly to each other.
Components¶
| Component | Technology | Port | Who Implements |
|---|---|---|---|
| Worker Service | FastAPI + uvicorn + requests | 5001–500N | Student |
| Inventory Service | FastAPI + uvicorn + psycopg2 | 8000 | Student (partially provided) |
| Database | PostgreSQL 16 | 5432 | Provided |
| Infrastructure | Docker + Docker Compose | — | Provided |
Network & Communication¶
All containers run on a shared Docker network (orderprocessing-net). Containers communicate using service names as hostnames (e.g., http://worker-1:5001).
| From | To | Protocol | Purpose |
|---|---|---|---|
| Worker | Other Workers | HTTP POST | Send REQUEST and REPLY messages |
| Worker | Inventory Service | HTTP | Check stock, process orders |
| Inventory Service | PostgreSQL | TCP (psycopg2) | SQL queries |
Workers never talk to PostgreSQL directly. All database access goes through the Inventory Service.
Key Difference from Assignment 2
In Assignment 2, workers talked to a central lock server to coordinate. Here, workers send messages to each other. When a worker wants access, it sends a REQUEST to every peer and waits until all peers have replied.
Why This Matters¶
Decentralized coordination appears throughout production systems:
- Distributed mutual exclusion — foundational protocols for peer-to-peer coordination
- Logical clocks — ordering events in systems where physical clocks can't be trusted
- Raft / Paxos — consensus protocols that build on similar message-passing patterns
- Service meshes — decentralized traffic management (Istio, Linkerd)
- Blockchain — decentralized agreement without a central authority
Key Concepts¶
Lamport Logical Clocks¶
In a distributed system, you can't rely on wall-clock time — machines' clocks drift. Lamport logical clocks provide a way to order events without synchronized clocks.
Each worker maintains a logical clock (an integer counter). The rules are:
| Event | Clock Update |
|---|---|
| Internal event (e.g., deciding to request access) | clock = clock + 1 |
| Sending a message | clock = clock + 1, attach clock as the message timestamp |
Receiving a message with timestamp ts |
clock = max(clock, ts) + 1 |
The key property: if event A happened before event B, then clock(A) < clock(B). This gives you a total ordering of events across all workers.
Priority Ordering¶
When comparing requests, use the tuple (timestamp, worker_id):
- Lower timestamp = higher priority (you asked first)
- Equal timestamps → lower worker ID wins (deterministic tie-breaker)
Examples:
(3, 2)has higher priority than(5, 1)— earlier timestamp wins(5, 1)has higher priority than(5, 2)— same timestamp, lower ID wins
Deferred Replies¶
When Worker A receives a REQUEST from Worker B, it must decide: grant immediately (send REPLY) or defer (reply later)?
The decision depends on A's current state and how A's priority compares to B's. When A exits the critical section, it sends REPLY to all deferred requests.
The Coordination Protocol¶
Your workers must satisfy these properties without a central coordinator:
| Property | Requirement |
|---|---|
| Mutual Exclusion | At most one worker accesses the inventory service at any time |
| Fairness | Requests are granted in timestamp order — no worker is indefinitely postponed |
| Progress | No deadlock or starvation |
| Efficiency | Each critical section entry requires exactly 2(N-1) messages: N-1 REQUESTs sent + N-1 REPLYs received |
Protocol Rules¶
Rule 1 — Requesting: To enter the critical section, a worker must send a REQUEST (containing its timestamp and ID) to every other worker and wait until it has received a REPLY from all of them.
Rule 2 — Handling a REQUEST: When a worker receives a REQUEST from a peer, it must either reply immediately or defer the reply until later, based on its own state and the relative priorities of the two requests.
Rule 3 — Handling a REPLY: When a worker receives a REPLY, it tracks the count. Once it has received N-1 replies (one from every peer), it may enter the critical section.
Rule 4 — Releasing: When a worker exits the critical section, it must send a REPLY to every peer whose reply it deferred.
Worker State¶
Each worker maintains:
| State | Type | Description |
|---|---|---|
clock |
int |
Lamport logical clock (starts at 0) |
state |
enum | RELEASED, WANTED, or HELD |
request_timestamp |
int or None |
The clock value when this worker requested access |
replies_received |
int |
Count of REPLY messages received for the current request |
deferred_queue |
list |
Worker IDs whose REPLY has been deferred |
Message Flow Example¶
sequenceDiagram
participant W1 as Worker 1
participant W2 as Worker 2
participant W3 as Worker 3
participant IS as Inventory Service
Note over W1: Wants access
W1->>W2: REQUEST
W1->>W3: REQUEST
Note over W2: Not requesting → reply
W2-->>W1: REPLY
Note over W3: Also wants access
W3->>W1: REQUEST
W3->>W2: REQUEST
Note over W1: I have priority → defer W3
Note over W3: W1 has priority → reply
W3-->>W1: REPLY
Note over W1: All replies → enter critical section
W1->>IS: POST /process-order
IS-->>W1: {success: true}
Note over W1: Release → send deferred replies
W1-->>W3: REPLY
W2-->>W3: REPLY
Note over W3: All replies → enter critical section
W3->>IS: POST /process-order
IS-->>W3: {success: true}
Question 1: When should you defer vs. reply immediately?
Rule 2 says to decide based on "state and relative priorities." There are three states (RELEASED, WANTED, HELD) and two priority outcomes (you win or they win). Work out the correct action for each combination. What goes wrong if you always reply immediately? What goes wrong if you always defer?
Question 2: Why exactly 2(N-1) messages?
Each request sends N-1 REQUEST messages and receives N-1 REPLY messages. No polling, no retries — every message is meaningful. Compare this to Assignment 2's centralized approach (2 messages per request) and ask yourself: what's the trade-off?
Question 3: Why does the clock update on receive use max + 1?
If Worker A has clock 5 and receives a message with timestamp 10, A's clock jumps to 11. Why not just use 10? What ordering guarantee would break?
Question 4: Why can't you just use wall-clock time instead of Lamport clocks?
Two machines might both report the same wall-clock time for different events. Even with NTP, clocks can drift by milliseconds. How would this break the priority ordering?
Project Structure¶
cs4459_a3/
├── worker/ # ⬅ YOU IMPLEMENT
│ ├── Dockerfile # Provided
│ ├── requirements.txt # Provided
│ └── worker.py # Skeleton — you build the coordination logic
├── inventory_service/ # ⬅ YOU IMPLEMENT POST /process-order
│ ├── Dockerfile # Provided
│ ├── requirements.txt # Provided
│ ├── helpers.py # Provided — DO NOT MODIFY (get_db_connection)
│ └── inventory_server.py # Most endpoints provided — you implement /process-order
├── database/ # Provided — DO NOT MODIFY
│ └── init.sql # Creates tables and seeds 200 items
└── docker-compose.yml # Provided — orchestrates all services
Getting Started¶
Prerequisites¶
You need the same tools as Assignment 2:
| Tool | Version | Purpose |
|---|---|---|
| Docker | 24+ | Container runtime |
| Docker Compose | v2+ | Multi-container orchestration |
| Python | 3.11+ | Local testing (optional) |
| Git | Any | Version control |
Build and Start¶
git clone https://github.com/MuAlarbi/cs4459_assignment3.git
cd cs4459_assignment3
docker compose up --build
You should see output from four services:
inventory-service-1 | INFO: Uvicorn running on http://0.0.0.0:8000
worker-1 | INFO: Uvicorn running on http://0.0.0.0:5001
worker-2 | INFO: Uvicorn running on http://0.0.0.0:5002
worker-3 | INFO: Uvicorn running on http://0.0.0.0:5003
db-1 | LOG: database system is ready to accept connections
Workers won't do anything yet
The skeleton code starts the FastAPI servers but has no endpoints implemented. Hitting POST /start or GET /state will return 404. Your first task is to get the endpoints working — then implement the coordination logic.
Verify Services Are Running¶
Once you've implemented your endpoints, verify they respond:
# Check worker state
curl http://localhost:5001/state
# Should return: {"worker_id": 1, "clock": 0, "state": "RELEASED", ...}
# Check inventory
curl http://localhost:8000/inventory/summary
# Should return: {"total_items": 200, "available": 200, "sold": 0, "orders_processed": 0}
See the Problem (No Coordination)¶
To see why coordination matters, try implementing POST /start without acquire() / release() — just query an available item and process it directly. Then trigger all workers simultaneously:
for i in 1 2 3; do
curl -X POST http://localhost:500$i/start &
done
wait
curl http://localhost:8000/inventory/summary
Expected: Oversells
Without coordination, multiple workers will process the same items simultaneously. You should see more orders processed than items available — this is the problem the coordination protocol solves.
Worker Service¶
Each worker is a FastAPI microservice. You are given a minimal skeleton — the FastAPI app, environment variables, and empty stubs. You build everything else.
What's Provided¶
from fastapi import FastAPI
import os
import requests
import time
import threading
app = FastAPI()
# --- Environment Variables (set by Docker Compose) ---
WORKER_ID = int(os.environ["WORKER_ID"])
NUM_WORKERS = int(os.environ["NUM_WORKERS"])
NUM_ORDERS = int(os.environ.get("NUM_ORDERS", "50"))
INVENTORY_URL = os.environ.get("INVENTORY_URL", "http://inventory-service:8000")
def get_peer_url(worker_id: int) -> str:
"""Returns the base URL for a peer worker."""
return f"http://worker-{worker_id}:{5000 + worker_id}"
# --- Coordination State ---
# Use state_lock to protect all coordination state.
# Multiple threads access this state: the /start processing loop,
# the /request handler, and the /reply handler.
state_lock = threading.Lock()
clock = 0
state = "RELEASED" # "RELEASED", "WANTED", or "HELD"
request_timestamp = None # Clock value when we requested access
replies_received = 0 # Count of REPLY messages for current request
deferred_queue = [] # Worker IDs whose REPLY we deferred
# Event to signal when all replies have been received
all_replies = threading.Event()
# ============================================================
# TODO: Implement the coordination protocol and endpoints
# ============================================================
# --- Coordination Functions ---
def acquire():
"""
Request access to the critical section.
Sends REQUEST to all peers, then waits until all have replied.
"""
# TODO
pass
def release():
"""
Release access to the critical section.
Sends deferred REPLY messages to all waiting peers.
"""
# TODO
pass
# --- Message Handlers ---
# TODO: Implement POST /request
# Receives a REQUEST(timestamp, sender_id) from a peer.
# Must decide: reply immediately or defer.
# TODO: Implement POST /reply
# Receives a REPLY(timestamp) from a peer.
# Must update clock and track reply count.
# --- Other Endpoints ---
# TODO: Implement GET /state
# Must return: {"worker_id": ..., "clock": ..., "state": ...,
# "request_timestamp": ..., "replies_received": ...,
# "deferred_count": ...}
# TODO: Implement POST /start
# Must: loop NUM_ORDERS times, acquire, get available item,
# call POST /process-order with idempotency_key, release.
# Stop early if no items available.
# Must return: {"status": "completed", "worker_id": ..., "orders_processed": ...}
# TODO: Implement GET /peers
# Must return: {"worker_id": ..., "peers": [...]}
What You Must Build¶
Using the skeleton above, you implement:
acquire()— set state, increment clock, send REQUESTs, wait for all REPLYsrelease()— set state, send deferred REPLYsPOST /request— receive a REQUEST, decide to reply or deferPOST /reply— receive a REPLY, update clock, track countGET /state— return coordination state for debugging and gradingPOST /start— order processing loop (acquire → query items → process → release)GET /peers— return peer URLs- All HTTP calls — to peers and to the inventory service. Use the
requestslibrary
Function names are mandatory
Your worker.py must contain acquire and release as callable functions. The auto-grader verifies correct behavior through the HTTP endpoints and the access_log in the database.
Threading is critical
The POST /request and POST /reply endpoints receive messages from peers while your acquire() is blocking. FastAPI handles these in separate threads. Use state_lock to protect shared state and all_replies (a threading.Event) to signal when all replies arrive. Never hold state_lock while making HTTP calls — this will deadlock.
Inventory Service¶
The inventory service manages the shared inventory database. Most of it is provided — you implement one endpoint with a real distributed systems concern: idempotent order processing.
Why Idempotency?¶
In a distributed system, HTTP requests can fail in ambiguous ways. Consider this scenario:
- Worker 1 sends
POST /process-orderto sell item 47 - The inventory service processes the order successfully
- The HTTP response is lost (network timeout)
- Worker 1 doesn't know if the order went through — it retries
- Without idempotency, item 47 gets processed twice
Production APIs (Stripe, AWS, any payment system) solve this with idempotency keys: a unique identifier for each logical operation. If the service sees the same key twice, it returns the original result instead of re-processing.
Database Schema¶
Four tables are provided (in database/init.sql):
inventory table — 200 items, each with quantity = 1.
| Column | Type | Description |
|---|---|---|
item_id |
SERIAL PRIMARY KEY |
Unique item identifier |
item_name |
VARCHAR(100) |
Item name |
quantity |
INTEGER |
Units in stock (starts at 1) |
status |
VARCHAR(20) |
'available' or 'sold_out' |
order_history table — Records every successful order.
| Column | Type | Description |
|---|---|---|
order_id |
SERIAL PRIMARY KEY |
Auto-incrementing order ID |
item_id |
INTEGER REFERENCES inventory |
Which item was ordered |
worker_id |
INTEGER |
Which worker processed it |
idempotency_key |
VARCHAR(100) UNIQUE |
Unique key for this operation |
processed_at |
TIMESTAMP DEFAULT NOW() |
When it was processed |
access_log table — Audit trail for mutual exclusion verification.
| Column | Type | Description |
|---|---|---|
log_id |
SERIAL PRIMARY KEY |
Auto-incrementing log ID |
worker_id |
INTEGER |
Which worker |
action |
VARCHAR(50) |
'enter' or 'exit' |
timestamp |
TIMESTAMP DEFAULT NOW() |
When it occurred |
What's Provided¶
The following endpoints are already implemented in the skeleton:
GET /inventory/available— Returns available itemsGET /inventory/summary— Returns inventory countsGET /access-log— Returns the access audit log
The only helper is get_db_connection(), which returns a psycopg2 connection to PostgreSQL.
What You Implement¶
POST /process-order — The critical section operation. Processes a single order idempotently.
Request body:
This endpoint must:
- Log entry — Write to
access_logwith action'enter' - Check idempotency — Query
order_historyfor theidempotency_key. If it already exists, skip processing and return the original result (the order that was already created) - Validate — Check that the item exists and is still available
- Process — Update the
inventorytable (quantity = 0,status = 'sold_out') and insert intoorder_history(including theidempotency_key) - Log exit — Write to
access_logwith action'exit'
Response (success — first attempt):
Response (success — duplicate request, same idempotency key):
Response (item unavailable):
Testing¶
Running the Tests¶
Start the system:
Trigger workers:
Check results:
What You Should Test¶
Before submitting, make sure your system handles the following correctly:
Mutual Exclusion — Verify that no two workers are inside the critical section at the same time. If your coordination logic is wrong, you will see overlapping entries.
Correctness — Verify that no item is sold more than once (no oversells), that the total orders processed matches the number of sold items, and that every order corresponds to a valid inventory change.
Fairness — Run the system multiple times with varying worker counts and verify that all workers eventually complete (no starvation) and no single worker processes a disproportionate share of orders.
Idempotency — Verify that sending duplicate order requests with the same idempotency key only decrements inventory once, returns the same order ID, and correctly indicates the request was a duplicate.
Testing Tip
The auto-grader will evaluate all of the above. Use curl or a simple script to exercise these scenarios yourself before submitting.
API Reference¶
Worker Endpoints¶
| Endpoint | Method | Description |
|---|---|---|
/state |
GET |
Returns {worker_id, clock, state, request_timestamp, replies_received, deferred_count} |
/start |
POST |
Triggers order processing, returns {status, worker_id, orders_processed} |
/request |
POST |
Receives {timestamp, sender_id} — handles incoming REQUEST |
/reply |
POST |
Receives {timestamp, sender_id} — handles incoming REPLY |
/peers |
GET |
Returns {worker_id, peers[]} |
Inventory Service Endpoints¶
| Endpoint | Method | Description |
|---|---|---|
/inventory/available |
GET |
Returns {items[], count} (provided) |
/process-order |
POST |
Accepts {worker_id, item_id, idempotency_key}, returns {success, order_id?, duplicate?, reason?} |
/inventory/summary |
GET |
Returns {total_items, available, sold, orders_processed} (provided) |
/access-log |
GET |
Returns {entries[], count} (provided) |
Worker Functions (must exist in worker.py)¶
| Function | Description |
|---|---|
acquire() |
Sends REQUESTs, waits for all REPLYs |
release() |
Sends deferred REPLYs, resets state |
How grading works
The auto-grader tests your coordination logic by hitting POST /start on all workers simultaneously, then checking GET /access-log for mutual exclusion violations and GET /inventory/summary for oversells. Idempotency is tested by sending duplicate POST /process-order requests directly.
Deliverables¶
Submit the following files:
| File | Description |
|---|---|
worker/worker.py |
Your complete worker with message-based coordination and all endpoints |
inventory_service/inventory_server.py |
Your complete inventory service with idempotent POST /process-order |
a3.pdf |
Your answers to Questions 1, Question 2, Question 3, and Question 4 |
Do not modify provided files
Do not modify helpers.py, init.sql, Dockerfile, requirements.txt, docker-compose.yml. Your submission will be tested against the original versions of these files.
Hints¶
Hint 1: Start small
Get mutual exclusion working with 2 workers first. With 2 workers, there's only 1 peer — 1 REQUEST sent, 1 REPLY received. Much easier to debug.
Hint 2: Threading is the hardest part
acquire() blocks waiting for replies. POST /request and POST /reply arrive on separate threads. Use threading.Event to signal between them — acquire() calls all_replies.wait(), and the /reply handler calls all_replies.set() when the count reaches N-1.
Hint 3: Never hold the lock during HTTP calls
Acquire state_lock, update state, release state_lock, then send HTTP messages. If you hold the lock while sending a REQUEST, and the peer's handler tries to acquire the lock to process it — deadlock.
Hint 4: Test idempotency independently
Before wiring up the coordination logic, test your POST /process-order with curl. Send the same request twice with the same idempotency_key and verify you get "duplicate": true on the second call with the same order_id.
Hint 5: Use logging liberally
Print every message sent and received with clock values: [Worker 1, clock=5] Sending REQUEST to Worker 2. This trace will save you hours of debugging.
Concepts to Understand¶
This assignment covers the following concepts from lectures:
- Mutual exclusion — the safety property that at most one process is in its critical section
- Lamport logical clocks — a mechanism for ordering events in a distributed system without synchronized physical clocks
- Happened-before relation — the causal ordering that logical clocks capture
- Message complexity —
2(N-1)messages per critical section entry, and why this is optimal for a permission-based protocol - Idempotency — ensuring that repeated requests produce the same result, critical for reliability in distributed systems
- Single point of failure — the fundamental weakness of centralized coordination (Assignment 2) and why decentralized approaches matter
Academic Integrity¶
This is an individual assignment. You may discuss high-level concepts with classmates, but all code must be your own work. Do not share code or look at other students' implementations. Violations will result in a grade of zero and referral to the Academic Integrity Office.
Resources¶
- Course lecture notes on distributed mutual exclusion and logical clocks
- Textbook: Tanenbaum & van Steen, Chapter 6 (Coordination)