CS 4459: Sharded MongoDB Cluster¶
Due Date
April 6, 11:59 PM
Late Submissions
Late submissions accepted with penalty until April 10, 11:59 PM. Late coupons may be applied. No submissions accepted after April 10.
Replication keeps data safe but does not solve scale. When a dataset grows beyond what a single machine can handle, you need sharding — partitioning data across multiple independent nodes, each holding a subset. Queries are routed automatically and load is distributed.
In this assignment you will design and operate a production-grade sharded MongoDB cluster for a ride-sharing platform. Each shard is a 3-node replica set, giving you horizontal scalability and fault tolerance simultaneously.
Learning Objectives¶
- Configure a sharded cluster with config servers, shard replica sets, and a mongos router
- Understand why each shard is itself a replica set in production
- Choose between ranged and hash sharding based on access patterns
- Select a sharding key that distributes load evenly
- Observe chunk splitting and migration as data grows
- Reason about hotspots and fault tolerance in writing
- Demonstrate failure transparency experimentally
The Domain¶
RideFlow is a ride-sharing platform operating in multiple cities, processing millions of trips per day. Two collections:
trips — one document per completed trip:
{
"_id": ObjectId("..."),
"trip_id": "TRP-00042",
"driver_id": "DRV-8821",
"rider_id": "RDR-3301",
"city": "Toronto",
"start_time": ISODate("2024-03-15T09:14:00Z"),
"end_time": ISODate("2024-03-15T09:38:00Z"),
"distance_km": 12.4,
"fare_usd": 18.75,
"status": "completed"
}
drivers — one document per registered driver, updated frequently:
{
"_id": ObjectId("..."),
"driver_id": "DRV-8821",
"name": "Maria Chen",
"city": "Toronto",
"rating": 4.87,
"trips_completed": 1423,
"status": "available",
"subscription_month": 3
}
The dataset contains 10,000 trips and 5,000 drivers spread evenly across five cities (Toronto, Vancouver, Montreal, Calgary, Ottawa) spanning January–March 2024.
Cluster Architecture¶
| Component | Role | Count |
|---|---|---|
Config servers (configRS) |
Store cluster metadata and chunk maps | 3 |
Shard 0 (shard0RS) |
Store a subset of data, replicated | 3 |
Shard 1 (shard1RS) |
Store a subset of data, replicated | 3 |
| mongos | Route queries to the correct shard | 1 |
Total: 10 containers.
graph TB
Client([Python Client]) --> MS
subgraph Router["Query Router"]
MS[mongos :27023]
end
subgraph Config["Config Server Replica Set — configRS"]
CS0[configsvr0 :27020]
CS1[configsvr1 :27021]
CS2[configsvr2 :27022]
end
subgraph Shard0["Shard 0 Replica Set — shard0RS"]
S0P[shard0a :27017 PRIMARY]
S0S1[shard0b :27018 SECONDARY]
S0S2[shard0c :27019 SECONDARY]
end
subgraph Shard1["Shard 1 Replica Set — shard1RS"]
S1P[shard1a :27027 PRIMARY]
S1S1[shard1b :27028 SECONDARY]
S1S2[shard1c :27029 SECONDARY]
end
MS --> CS0
MS --> S0P
MS --> S1P
CS0 --- CS1 --- CS2
S0P --- S0S1 --- S0S2
S1P --- S1S1 --- S1S2
Prerequisites¶
Add all container names to /etc/hosts so your host machine can resolve them:
sudo sh -c 'echo "127.0.0.1 configsvr0 configsvr1 configsvr2 shard0a shard0b shard0c shard1a shard1b shard1c mongos" >> /etc/hosts'
mongos advertises shard hostnames using container names. Your Python client on the host must resolve them.
WSL2 users
Run the hosts command inside WSL and add the same line to C:\Windows\System32\drivers\etc\hosts on Windows.
Project Structure¶
Getting Started¶
Clone the assignment repository:
cs4459_a4/
├── docker-compose.yml ← YOU WRITE
├── init.sh ← YOU WRITE (starter provided)
├── shard.py ← YOU WRITE
├── load.py ← PROVIDED
├── trips.json ← PROVIDED
├── drivers.json ← PROVIDED
└── a4.pdf ← YOU WRITE
Workflow:
docker compose up -d
bash init.sh
python shard.py ← your sharding decisions
python load.py ← inserts the provided dataset
Part A — Docker Compose¶
Write docker-compose.yml launching the full 10-container cluster.
Config servers (configRS):
- Services:
configsvr0,configsvr1,configsvr2 - Command:
mongod --configsvr --replSet configRS --bind_ip_all - Ports:
27020,27021,27022
Shard 0 (shard0RS):
- Services:
shard0a,shard0b,shard0c - Command:
mongod --shardsvr --replSet shard0RS --bind_ip_all - Ports:
27017,27018,27019
Shard 1 (shard1RS):
- Services:
shard1a,shard1b,shard1c - Command:
mongod --shardsvr --replSet shard1RS --bind_ip_all - Ports:
27027,27028,27029
mongos router:
- Service:
mongos - Command:
mongos --configdb configRS/configsvr0:27020,configsvr1:27021,configsvr2:27022 --bind_ip_all - Port:
27023 depends_onall three config servers
Network: all 10 services on a single bridge network named rideflow-net. Use image mongo:7.
mongos uses a different binary
mongos is not mongod. It does not take --configsvr or --shardsvr. Its key flag is --configdb, formatted as replicaSetName/host:port,....
Shard replica set names must differ
shard0RS and shard1RS hold different data. Do not reuse the same --replSet name for both shards.
Verify:
Part B — Cluster Initialization¶
Starting the containers is not enough. Each replica set must be initialized, shards must be registered with mongos, and sharding must be enabled on the database.
Write init.sh using the starter below. Each docker exec call runs a self-contained mongosh --eval block — you do not open an interactive shell.
Starter: init.sh¶
#!/bin/bash
set -e
echo "==> Step 1: Init config server replica set"
docker exec configsvr0 mongosh --port 27020 --eval '
rs.initiate({
_id: "configRS",
configsvr: true,
members: [
{ _id: 0, host: "configsvr0:27020" },
{ _id: 1, host: "configsvr1:27021" },
{ _id: 2, host: "configsvr2:27022" }
]
})
'
sleep 5
echo "==> Step 2: Init shard0RS"
# YOUR CODE HERE
# Connect to shard0a on port 27017
# Call rs.initiate() with _id "shard0RS" and all three shard0 members
sleep 5
echo "==> Step 3: Init shard1RS"
# YOUR CODE HERE
# Connect to shard1a on port 27027
# Call rs.initiate() with _id "shard1RS" and all three shard1 members
sleep 5
echo "==> Step 4: Register shards and enable sharding"
# YOUR CODE HERE
# Connect to mongos on port 27023
# Call sh.addShard() for both replica sets — format: "rsName/host:port,host:port,host:port"
# Call sh.enableSharding("rideflow")
echo "Done."
sh.addShard() format
sh.addShard("shard0RS/shard0a:27017,shard0b:27018,shard0c:27019")
This tells mongos to treat the shard as a replica set and track its primary automatically.
Check election before proceeding
After each rs.initiate(), verify a primary was elected before moving to the next step:
Verify:
Confirm: both shards appear with state: 1, balancer is enabled, rideflow database is listed.
Part C — Sharding Design¶
Design your sharding strategy for both collections before writing any implementation code. Record your decisions in a4.pdf.
trips collection¶
Access patterns: - Operations staff query trips by city and date range - Fraud detection queries all trips by a specific driver - Reporting aggregates trips by city and month
Choose a sharding key and strategy (ranged or hashed). Justify both. Identify which query types will be targeted (routed to one shard) vs. broadcast (sent to all shards), and explain why that matters at scale.
drivers collection¶
Access patterns:
- Every completed trip updates the driver document (trips_completed, status, rating)
- Riders look up a driver by driver_id
- New drivers are added continuously as the platform expands
Choose a sharding key and strategy. Justify both. Explain why your key distributes writes evenly, and what would go wrong if you chose city instead.
Implementation: shard.py¶
Write shard.py that implements your sharding decisions. For each collection:
- Set chunk size to
1MB before data is loaded: - Create the required index on your sharding key
- Ranged:
db.collection.createIndex({ key: 1 }) - Hashed:
db.collection.createIndex({ key: "hashed" })
- Ranged:
- Shard the collection:
sh.shardCollection("rideflow.collection", { key: ... })
Run shard.py once after init.sh. Then run load.py.
Note
MongoDB requires the index to exist before sh.shardCollection() will succeed.
Question 1 — Targeted vs. broadcast queries¶
After running load.py, run explain() on two queries:
// Query A — uses your sharding key
db.trips.find({ <your_shard_key>: <a_value> }).explain("executionStats")
// Query B — does not use your sharding key
db.trips.find({ fare_usd: { $gt: 20 } }).explain("executionStats")
Look at queryPlanner.winningPlan.stage in each output. Is Query A SINGLE_SHARD or SHARD_MERGE? Does this match your design? Explain the difference between the two outputs.
Question 2 — Chunk pre-splitting and distribution¶
Run sh.status() after load.py completes. You will notice the balancer reports no recent migrations and the chunk count is the same as immediately after shard.py ran.
Examine the chunk structure for both collections. How many chunks exist per collection and how are they distributed across the two shards? Why did the balancer have nothing to do despite 100,000 trips being inserted? What does this tell you about how MongoDB 7 allocates chunks for hash-sharded collections, and why is pre-splitting preferable to splitting chunks reactively as data grows?
Question 3 — Hash vs. ranged chunk boundaries¶
Compare chunk boundaries in sh.status() for your two collections. For the ranged-sharded collection, boundaries show actual field values. For the hash-sharded collection, boundaries show 64-bit integers. Explain why. What guarantee does hash sharding provide about data distribution that ranged sharding cannot?
Failure Analysis¶
Question 4 — The hotspot scenario¶
RideFlow launches in New York and within a week, 80% of all new trips are in New York.
Assume trips was sharded by city using ranged sharding. Explain step by step why this creates a hotspot. Which shard is affected? What symptoms would you observe? Why does adding more shards not solve the problem? What would you change about your sharding design to handle this, and what query efficiency would you sacrifice?
Question 5 — Fault tolerance with replica set shards¶
shard0a (primary of shard0RS) crashes while a client is writing a trip document with w="majority".
Step through the sequence: What does mongos observe? What do shard0b and shard0c do? Does the client write succeed or fail? Once a new primary is elected, does mongos need to be restarted or reconfigured? How does this differ from what would have happened if shard0 was a standalone node?
Question 6 — Demonstrating failure transparency¶
A key property of distributed systems is failure transparency — the ability of a system to hide node failures from clients and continue operating.
Design and execute an experiment that provides empirical evidence of this property in your cluster. You decide what to break, how to break it, and what evidence to collect.
Your answer must include:
- Your experimental design — what you did and why
- Screenshots showing cluster state before, during, and after the failure
- Evidence that queries returned results throughout, or a precise characterization of the failure window if they did not
- An explanation of what your results demonstrate about the cluster's fault tolerance guarantees
Grading¶
| Component | Weight |
|---|---|
| Part A — Docker Compose | 20% |
| Part B — Cluster initialization | 15% |
| Part C — Sharding design and implementation | 25% |
| Questions 1–6 | 40% |
| --- |
Deliverables¶
| File | Description |
|---|---|
docker-compose.yml |
Full 10-container cluster |
init.sh |
Initializes all replica sets and registers shards |
shard.py |
Indexes and shards both collections |
a4.pdf |
Design justification, screenshots, and answers to Q1–6 |
Files must be submited On OWL Brightspace.
Do not submit load.py, trips.json, or drivers.json.
PDF Requirements¶
Your a4.pdf must include, clearly labelled:
- Sharding design — sharding key and strategy for each collection with justification, written before implementation
- Screenshot:
sh.status()afterload.pycompletes, annotated to show chunk count per shard - Screenshot:
explain()output for both Query A and Query B — highlight thestagefield in each - Answers to Questions 1–6, with screenshots for Question 6
Hints¶
Watch the balancer live
Run sh.status() repeatedly after load.py completes to watch chunks split and migrate.
Balancer runs in the background
After load.py finishes, the balancer may still be active. Check with sh.isBalancerRunning() before taking your final screenshot.
Key Concepts¶
| Term | Definition |
|---|---|
| Sharding key | Field used to determine which shard a document belongs to |
| Ranged sharding | Assignment based on contiguous key ranges; supports range queries but vulnerable to hotspots |
| Hash sharding | Assignment based on a hash of the key; uniform distribution but range queries become broadcast |
| Chunk | A contiguous range of sharding key values; MongoDB splits and migrates chunks to balance load |
| mongos | Query router; clients connect here, it routes to the correct shard |
| Config servers | Store the cluster's chunk map; run as a replica set |
| Targeted query | Includes the sharding key; routed to exactly one shard |
| Broadcast query | Does not include the sharding key; sent to all shards, results merged |
| Hotspot | A shard receiving a disproportionate share of traffic due to a poorly chosen sharding key |
| Failure transparency | The property that node failures are hidden from clients by the cluster |
Resources¶
- Course lecture notes on sharding.
- MongoDB Sharding Documentation
- Deploy a Sharded Cluster
- PyMongo Documentation
Academic Integrity¶
Individual assignment. Discussing concepts with classmates is fine; sharing code is not. Violations result in a grade of zero and referral to the Academic Integrity Office.