Skip to content

CS 4459: Distributed Order Processing

Due Date

March 25, 11:55 PM

In Assignment 2, you built a centralized lock server to coordinate access to a shared booking database. It worked — but the lock server was a single point of failure. If it crashes, every booking agent is stuck.

In this assignment, you will eliminate the central coordinator entirely. Multiple order-processing microservices will coordinate access to a shared inventory database peer-to-peer, using a message-based distributed mutual exclusion protocol.

No single service is in charge. Every service makes its own decisions.


The Use Case

An e-commerce platform processes orders through N independent microservices. All services update a shared inventory database, but the legacy database can only handle one write transaction at a time to prevent data corruption.

Without coordination:

  • Two services see 1 unit of an item in stock
  • Both try to sell it
  • One customer gets an order that can never be fulfilled — an oversell

In Assignment 2, a central lock server solved this. But now imagine that server goes down during Black Friday. Every order stalls. Your job is to build a system where services coordinate among themselves — no single point of failure.


Learning Objectives

  • Peer-to-peer mutual exclusion — no central coordinator
  • Lamport logical clocks — ordering events across distributed nodes without synchronized clocks
  • Message-based coordination — REQUEST/REPLY protocol with deferred replies
  • Optimal message complexity — exactly 2(N-1) messages per critical section entry
  • Containerization — package services with Docker
  • Idempotent operations — safely handle duplicate requests in unreliable networks

System Architecture

The system consists of three types of components running as Docker containers on a shared network. There is no central coordinator.

graph TB
    subgraph Workers["Order Processors (Peer-to-Peer)"]
        W1[Worker 1<br/>FastAPI :5001]
        W2[Worker 2<br/>FastAPI :5002]
        W3[Worker 3<br/>FastAPI :5003]
        WN[Worker N<br/>FastAPI :500N]
    end

    subgraph Data["Data Layer"]
        IS[Inventory Service<br/>FastAPI :8000]
        DB[(PostgreSQL<br/>:5432)]
    end

    W1 <-.->|"REQUEST / REPLY"| W2
    W1 <-.->|"REQUEST / REPLY"| W3
    W2 <-.->|"REQUEST / REPLY"| W3
    W2 <-.->|"REQUEST / REPLY"| WN
    W3 <-.->|"REQUEST / REPLY"| WN
    W1 <-.->|"REQUEST / REPLY"| WN

    W1 ==>|"process order"| IS
    W2 ==>|"process order"| IS
    W3 ==>|"process order"| IS
    WN ==>|"process order"| IS

    IS -->|SQL| DB

    style IS fill:#F39C12,stroke:#D68910,stroke-width:3px,color:#fff
    style DB fill:#50C878,stroke:#2E7D4E,stroke-width:3px,color:#fff
    style W1 fill:#9B59B6,stroke:#6C3483,stroke-width:2px,color:#fff
    style W2 fill:#9B59B6,stroke:#6C3483,stroke-width:2px,color:#fff
    style W3 fill:#9B59B6,stroke:#6C3483,stroke-width:2px,color:#fff
    style WN fill:#9B59B6,stroke:#6C3483,stroke-width:2px,color:#fff

No Lock Server

Compare this diagram with Assignment 2. The Lock Server box is gone. Workers coordinate by sending REQUEST and REPLY messages directly to each other.

Components

Component Technology Port Who Implements
Worker Service FastAPI + uvicorn + requests 5001–500N Student
Inventory Service FastAPI + uvicorn + psycopg2 8000 Student (partially provided)
Database PostgreSQL 16 5432 Provided
Infrastructure Docker + Docker Compose Provided

Network & Communication

All containers run on a shared Docker network (orderprocessing-net). Containers communicate using service names as hostnames (e.g., http://worker-1:5001).

From To Protocol Purpose
Worker Other Workers HTTP POST Send REQUEST and REPLY messages
Worker Inventory Service HTTP Check stock, process orders
Inventory Service PostgreSQL TCP (psycopg2) SQL queries

Workers never talk to PostgreSQL directly. All database access goes through the Inventory Service.

Key Difference from Assignment 2

In Assignment 2, workers talked to a central lock server to coordinate. Here, workers send messages to each other. When a worker wants access, it sends a REQUEST to every peer and waits until all peers have replied.


Why This Matters

Decentralized coordination appears throughout production systems:

  • Distributed mutual exclusion — foundational protocols for peer-to-peer coordination
  • Logical clocks — ordering events in systems where physical clocks can't be trusted
  • Raft / Paxos — consensus protocols that build on similar message-passing patterns
  • Service meshes — decentralized traffic management (Istio, Linkerd)
  • Blockchain — decentralized agreement without a central authority

Key Concepts

Lamport Logical Clocks

In a distributed system, you can't rely on wall-clock time — machines' clocks drift. Lamport logical clocks provide a way to order events without synchronized clocks.

Each worker maintains a logical clock (an integer counter). The rules are:

Event Clock Update
Internal event (e.g., deciding to request access) clock = clock + 1
Sending a message clock = clock + 1, attach clock as the message timestamp
Receiving a message with timestamp ts clock = max(clock, ts) + 1

The key property: if event A happened before event B, then clock(A) < clock(B). This gives you a total ordering of events across all workers.

Priority Ordering

When comparing requests, use the tuple (timestamp, worker_id):

  • Lower timestamp = higher priority (you asked first)
  • Equal timestamps → lower worker ID wins (deterministic tie-breaker)

Examples:

  • (3, 2) has higher priority than (5, 1) — earlier timestamp wins
  • (5, 1) has higher priority than (5, 2) — same timestamp, lower ID wins

Deferred Replies

When Worker A receives a REQUEST from Worker B, it must decide: grant immediately (send REPLY) or defer (reply later)?

The decision depends on A's current state and how A's priority compares to B's. When A exits the critical section, it sends REPLY to all deferred requests.


The Coordination Protocol

Your workers must satisfy these properties without a central coordinator:

Property Requirement
Mutual Exclusion At most one worker accesses the inventory service at any time
Fairness Requests are granted in timestamp order — no worker is indefinitely postponed
Progress No deadlock or starvation
Efficiency Each critical section entry requires exactly 2(N-1) messages: N-1 REQUESTs sent + N-1 REPLYs received

Protocol Rules

Rule 1 — Requesting: To enter the critical section, a worker must send a REQUEST (containing its timestamp and ID) to every other worker and wait until it has received a REPLY from all of them.

Rule 2 — Handling a REQUEST: When a worker receives a REQUEST from a peer, it must either reply immediately or defer the reply until later, based on its own state and the relative priorities of the two requests.

Rule 3 — Handling a REPLY: When a worker receives a REPLY, it tracks the count. Once it has received N-1 replies (one from every peer), it may enter the critical section.

Rule 4 — Releasing: When a worker exits the critical section, it must send a REPLY to every peer whose reply it deferred.

Worker State

Each worker maintains:

State Type Description
clock int Lamport logical clock (starts at 0)
state enum RELEASED, WANTED, or HELD
request_timestamp int or None The clock value when this worker requested access
replies_received int Count of REPLY messages received for the current request
deferred_queue list Worker IDs whose REPLY has been deferred

Message Flow Example

sequenceDiagram
    participant W1 as Worker 1
    participant W2 as Worker 2
    participant W3 as Worker 3
    participant IS as Inventory Service

    Note over W1: Wants access
    W1->>W2: REQUEST
    W1->>W3: REQUEST

    Note over W2: Not requesting → reply
    W2-->>W1: REPLY

    Note over W3: Also wants access
    W3->>W1: REQUEST
    W3->>W2: REQUEST

    Note over W1: I have priority → defer W3
    Note over W3: W1 has priority → reply
    W3-->>W1: REPLY

    Note over W1: All replies → enter critical section
    W1->>IS: POST /process-order
    IS-->>W1: {success: true}

    Note over W1: Release → send deferred replies
    W1-->>W3: REPLY
    W2-->>W3: REPLY

    Note over W3: All replies → enter critical section
    W3->>IS: POST /process-order
    IS-->>W3: {success: true}

Question 1: When should you defer vs. reply immediately?

Rule 2 says to decide based on "state and relative priorities." There are three states (RELEASED, WANTED, HELD) and two priority outcomes (you win or they win). Work out the correct action for each combination. What goes wrong if you always reply immediately? What goes wrong if you always defer?

Question 2: Why exactly 2(N-1) messages?

Each request sends N-1 REQUEST messages and receives N-1 REPLY messages. No polling, no retries — every message is meaningful. Compare this to Assignment 2's centralized approach (2 messages per request) and ask yourself: what's the trade-off?

Question 3: Why does the clock update on receive use max + 1?

If Worker A has clock 5 and receives a message with timestamp 10, A's clock jumps to 11. Why not just use 10? What ordering guarantee would break?

Question 4: Why can't you just use wall-clock time instead of Lamport clocks?

Two machines might both report the same wall-clock time for different events. Even with NTP, clocks can drift by milliseconds. How would this break the priority ordering?


Project Structure

cs4459_a3/
├── worker/                      # ⬅ YOU IMPLEMENT
│   ├── Dockerfile               # Provided
│   ├── requirements.txt         # Provided
│   └── worker.py                # Skeleton — you build the coordination logic
├── inventory_service/           # ⬅ YOU IMPLEMENT POST /process-order
│   ├── Dockerfile               # Provided
│   ├── requirements.txt         # Provided
│   ├── helpers.py               # Provided — DO NOT MODIFY (get_db_connection)
│   └── inventory_server.py      # Most endpoints provided — you implement /process-order
├── database/                    # Provided — DO NOT MODIFY
│   └── init.sql                 # Creates tables and seeds 200 items
└── docker-compose.yml           # Provided — orchestrates all services

Getting Started

Prerequisites

You need the same tools as Assignment 2:

Tool Version Purpose
Docker 24+ Container runtime
Docker Compose v2+ Multi-container orchestration
Python 3.11+ Local testing (optional)
Git Any Version control

Build and Start

git clone https://github.com/MuAlarbi/cs4459_assignment3.git
cd cs4459_assignment3
docker compose up --build

You should see output from four services:

inventory-service-1  | INFO:     Uvicorn running on http://0.0.0.0:8000
worker-1             | INFO:     Uvicorn running on http://0.0.0.0:5001
worker-2             | INFO:     Uvicorn running on http://0.0.0.0:5002
worker-3             | INFO:     Uvicorn running on http://0.0.0.0:5003
db-1                 | LOG:  database system is ready to accept connections

Workers won't do anything yet

The skeleton code starts the FastAPI servers but has no endpoints implemented. Hitting POST /start or GET /state will return 404. Your first task is to get the endpoints working — then implement the coordination logic.

Verify Services Are Running

Once you've implemented your endpoints, verify they respond:

# Check worker state
curl http://localhost:5001/state
# Should return: {"worker_id": 1, "clock": 0, "state": "RELEASED", ...}

# Check inventory
curl http://localhost:8000/inventory/summary
# Should return: {"total_items": 200, "available": 200, "sold": 0, "orders_processed": 0}

See the Problem (No Coordination)

To see why coordination matters, try implementing POST /start without acquire() / release() — just query an available item and process it directly. Then trigger all workers simultaneously:

for i in 1 2 3; do
    curl -X POST http://localhost:500$i/start &
done
wait
curl http://localhost:8000/inventory/summary

Expected: Oversells

Without coordination, multiple workers will process the same items simultaneously. You should see more orders processed than items available — this is the problem the coordination protocol solves.


Worker Service

Each worker is a FastAPI microservice. You are given a minimal skeleton — the FastAPI app, environment variables, and empty stubs. You build everything else.

What's Provided

from fastapi import FastAPI
import os
import requests
import time
import threading

app = FastAPI()

# --- Environment Variables (set by Docker Compose) ---
WORKER_ID = int(os.environ["WORKER_ID"])
NUM_WORKERS = int(os.environ["NUM_WORKERS"])
NUM_ORDERS = int(os.environ.get("NUM_ORDERS", "50"))
INVENTORY_URL = os.environ.get("INVENTORY_URL", "http://inventory-service:8000")

def get_peer_url(worker_id: int) -> str:
    """Returns the base URL for a peer worker."""
    return f"http://worker-{worker_id}:{5000 + worker_id}"

# --- Coordination State ---
# Use state_lock to protect all coordination state.
# Multiple threads access this state: the /start processing loop,
# the /request handler, and the /reply handler.

state_lock = threading.Lock()
clock = 0
state = "RELEASED"          # "RELEASED", "WANTED", or "HELD"
request_timestamp = None    # Clock value when we requested access
replies_received = 0        # Count of REPLY messages for current request
deferred_queue = []         # Worker IDs whose REPLY we deferred

# Event to signal when all replies have been received
all_replies = threading.Event()

# ============================================================
#  TODO: Implement the coordination protocol and endpoints
# ============================================================

# --- Coordination Functions ---

def acquire():
    """
    Request access to the critical section.
    Sends REQUEST to all peers, then waits until all have replied.
    """
    # TODO
    pass

def release():
    """
    Release access to the critical section.
    Sends deferred REPLY messages to all waiting peers.
    """
    # TODO
    pass

# --- Message Handlers ---

# TODO: Implement POST /request
# Receives a REQUEST(timestamp, sender_id) from a peer.
# Must decide: reply immediately or defer.

# TODO: Implement POST /reply
# Receives a REPLY(timestamp) from a peer.
# Must update clock and track reply count.

# --- Other Endpoints ---

# TODO: Implement GET /state
# Must return: {"worker_id": ..., "clock": ..., "state": ...,
#               "request_timestamp": ..., "replies_received": ...,
#               "deferred_count": ...}

# TODO: Implement POST /start
# Must: loop NUM_ORDERS times, acquire, get available item,
#        call POST /process-order with idempotency_key, release.
#        Stop early if no items available.
# Must return: {"status": "completed", "worker_id": ..., "orders_processed": ...}

# TODO: Implement GET /peers
# Must return: {"worker_id": ..., "peers": [...]}

What You Must Build

Using the skeleton above, you implement:

  • acquire() — set state, increment clock, send REQUESTs, wait for all REPLYs
  • release() — set state, send deferred REPLYs
  • POST /request — receive a REQUEST, decide to reply or defer
  • POST /reply — receive a REPLY, update clock, track count
  • GET /state — return coordination state for debugging and grading
  • POST /start — order processing loop (acquire → query items → process → release)
  • GET /peers — return peer URLs
  • All HTTP calls — to peers and to the inventory service. Use the requests library

Function names are mandatory

Your worker.py must contain acquire and release as callable functions. The auto-grader verifies correct behavior through the HTTP endpoints and the access_log in the database.

Threading is critical

The POST /request and POST /reply endpoints receive messages from peers while your acquire() is blocking. FastAPI handles these in separate threads. Use state_lock to protect shared state and all_replies (a threading.Event) to signal when all replies arrive. Never hold state_lock while making HTTP calls — this will deadlock.


Inventory Service

The inventory service manages the shared inventory database. Most of it is provided — you implement one endpoint with a real distributed systems concern: idempotent order processing.

Why Idempotency?

In a distributed system, HTTP requests can fail in ambiguous ways. Consider this scenario:

  1. Worker 1 sends POST /process-order to sell item 47
  2. The inventory service processes the order successfully
  3. The HTTP response is lost (network timeout)
  4. Worker 1 doesn't know if the order went through — it retries
  5. Without idempotency, item 47 gets processed twice

Production APIs (Stripe, AWS, any payment system) solve this with idempotency keys: a unique identifier for each logical operation. If the service sees the same key twice, it returns the original result instead of re-processing.

Database Schema

Four tables are provided (in database/init.sql):

inventory table — 200 items, each with quantity = 1.

Column Type Description
item_id SERIAL PRIMARY KEY Unique item identifier
item_name VARCHAR(100) Item name
quantity INTEGER Units in stock (starts at 1)
status VARCHAR(20) 'available' or 'sold_out'

order_history table — Records every successful order.

Column Type Description
order_id SERIAL PRIMARY KEY Auto-incrementing order ID
item_id INTEGER REFERENCES inventory Which item was ordered
worker_id INTEGER Which worker processed it
idempotency_key VARCHAR(100) UNIQUE Unique key for this operation
processed_at TIMESTAMP DEFAULT NOW() When it was processed

access_log table — Audit trail for mutual exclusion verification.

Column Type Description
log_id SERIAL PRIMARY KEY Auto-incrementing log ID
worker_id INTEGER Which worker
action VARCHAR(50) 'enter' or 'exit'
timestamp TIMESTAMP DEFAULT NOW() When it occurred

What's Provided

The following endpoints are already implemented in the skeleton:

  • GET /inventory/available — Returns available items
  • GET /inventory/summary — Returns inventory counts
  • GET /access-log — Returns the access audit log

The only helper is get_db_connection(), which returns a psycopg2 connection to PostgreSQL.

What You Implement

POST /process-order — The critical section operation. Processes a single order idempotently.

Request body:

{
    "worker_id": 2,
    "item_id": 47,
    "idempotency_key": "worker-2-order-15"
}

This endpoint must:

  1. Log entry — Write to access_log with action 'enter'
  2. Check idempotency — Query order_history for the idempotency_key. If it already exists, skip processing and return the original result (the order that was already created)
  3. Validate — Check that the item exists and is still available
  4. Process — Update the inventory table (quantity = 0, status = 'sold_out') and insert into order_history (including the idempotency_key)
  5. Log exit — Write to access_log with action 'exit'

Response (success — first attempt):

{
    "success": true,
    "order_id": 142,
    "item_id": 47,
    "worker_id": 2,
    "duplicate": false
}

Response (success — duplicate request, same idempotency key):

{
    "success": true,
    "order_id": 142,
    "item_id": 47,
    "worker_id": 2,
    "duplicate": true
}

Response (item unavailable):

{
    "success": false,
    "reason": "item_unavailable",
    "item_id": 47
}

Testing

Running the Tests

Start the system:

docker compose up --build -d

Trigger workers:

for i in 1 2 3; do
    curl -X POST http://localhost:500$i/start &
done
wait

Check results:

curl http://localhost:8000/inventory/summary

What You Should Test

Before submitting, make sure your system handles the following correctly:

Mutual Exclusion — Verify that no two workers are inside the critical section at the same time. If your coordination logic is wrong, you will see overlapping entries.

Correctness — Verify that no item is sold more than once (no oversells), that the total orders processed matches the number of sold items, and that every order corresponds to a valid inventory change.

Fairness — Run the system multiple times with varying worker counts and verify that all workers eventually complete (no starvation) and no single worker processes a disproportionate share of orders.

Idempotency — Verify that sending duplicate order requests with the same idempotency key only decrements inventory once, returns the same order ID, and correctly indicates the request was a duplicate.

Testing Tip

The auto-grader will evaluate all of the above. Use curl or a simple script to exercise these scenarios yourself before submitting.


API Reference

Worker Endpoints

Endpoint Method Description
/state GET Returns {worker_id, clock, state, request_timestamp, replies_received, deferred_count}
/start POST Triggers order processing, returns {status, worker_id, orders_processed}
/request POST Receives {timestamp, sender_id} — handles incoming REQUEST
/reply POST Receives {timestamp, sender_id} — handles incoming REPLY
/peers GET Returns {worker_id, peers[]}

Inventory Service Endpoints

Endpoint Method Description
/inventory/available GET Returns {items[], count} (provided)
/process-order POST Accepts {worker_id, item_id, idempotency_key}, returns {success, order_id?, duplicate?, reason?}
/inventory/summary GET Returns {total_items, available, sold, orders_processed} (provided)
/access-log GET Returns {entries[], count} (provided)

Worker Functions (must exist in worker.py)

Function Description
acquire() Sends REQUESTs, waits for all REPLYs
release() Sends deferred REPLYs, resets state

How grading works

The auto-grader tests your coordination logic by hitting POST /start on all workers simultaneously, then checking GET /access-log for mutual exclusion violations and GET /inventory/summary for oversells. Idempotency is tested by sending duplicate POST /process-order requests directly.


Deliverables

Submit the following files:

File Description
worker/worker.py Your complete worker with message-based coordination and all endpoints
inventory_service/inventory_server.py Your complete inventory service with idempotent POST /process-order
a3.pdf Your answers to Questions 1, Question 2, Question 3, and Question 4

Do not modify provided files

Do not modify helpers.py, init.sql, Dockerfile, requirements.txt, docker-compose.yml. Your submission will be tested against the original versions of these files.


Hints

Hint 1: Start small

Get mutual exclusion working with 2 workers first. With 2 workers, there's only 1 peer — 1 REQUEST sent, 1 REPLY received. Much easier to debug.

Hint 2: Threading is the hardest part

acquire() blocks waiting for replies. POST /request and POST /reply arrive on separate threads. Use threading.Event to signal between them — acquire() calls all_replies.wait(), and the /reply handler calls all_replies.set() when the count reaches N-1.

Hint 3: Never hold the lock during HTTP calls

Acquire state_lock, update state, release state_lock, then send HTTP messages. If you hold the lock while sending a REQUEST, and the peer's handler tries to acquire the lock to process it — deadlock.

Hint 4: Test idempotency independently

Before wiring up the coordination logic, test your POST /process-order with curl. Send the same request twice with the same idempotency_key and verify you get "duplicate": true on the second call with the same order_id.

Hint 5: Use logging liberally

Print every message sent and received with clock values: [Worker 1, clock=5] Sending REQUEST to Worker 2. This trace will save you hours of debugging.


Concepts to Understand

This assignment covers the following concepts from lectures:

  • Mutual exclusion — the safety property that at most one process is in its critical section
  • Lamport logical clocks — a mechanism for ordering events in a distributed system without synchronized physical clocks
  • Happened-before relation — the causal ordering that logical clocks capture
  • Message complexity2(N-1) messages per critical section entry, and why this is optimal for a permission-based protocol
  • Idempotency — ensuring that repeated requests produce the same result, critical for reliability in distributed systems
  • Single point of failure — the fundamental weakness of centralized coordination (Assignment 2) and why decentralized approaches matter

Academic Integrity

This is an individual assignment. You may discuss high-level concepts with classmates, but all code must be your own work. Do not share code or look at other students' implementations. Violations will result in a grade of zero and referral to the Academic Integrity Office.


Resources

  • Course lecture notes on distributed mutual exclusion and logical clocks
  • Textbook: Tanenbaum & van Steen, Chapter 6 (Coordination)