Code Examples
Code scaffolding and guidance for implementing the assignment. These are NOT complete solutions - you must implement the logic yourself.
Part 1: Basic Calculator
calculator.proto
syntax = "proto3";
package calculator;
service Calculator {
rpc Add(BinaryOperation) returns (Result);
rpc Subtract(BinaryOperation) returns (Result);
rpc Multiply(BinaryOperation) returns (Result);
rpc Divide(BinaryOperation) returns (Result);
}
message BinaryOperation {
double a = 1;
double b = 2;
}
message Result {
double value = 1;
string error = 2;
}
server.py
import grpc
from concurrent import futures
import sys
import os
# Add generated code to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
import calculator_pb2
import calculator_pb2_grpc
class CalculatorServicer(calculator_pb2_grpc.CalculatorServicer):
"""Calculator service implementation."""
def Add(self, request, context):
"""Add two numbers."""
# TODO: Implement addition
# Hint: Use request.a and request.b
# Return calculator_pb2.Result(value=...)
pass
def Subtract(self, request, context):
"""Subtract two numbers."""
# TODO: Implement subtraction
pass
def Multiply(self, request, context):
"""Multiply two numbers."""
# TODO: Implement multiplication
pass
def Divide(self, request, context):
"""Divide two numbers."""
# TODO: Check if request.b is zero
# If zero: set error code and return empty Result
# Otherwise: perform division and return result
pass
def serve():
"""Start the gRPC server."""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
calculator_pb2_grpc.add_CalculatorServicer_to_server(
CalculatorServicer(), server
)
server.add_insecure_port('[::]:50051')
print("Server starting on port 50051...")
server.start()
try:
server.wait_for_termination()
except KeyboardInterrupt:
print("\nShutting down gracefully...")
server.stop(grace=5)
if __name__ == '__main__':
serve()
client.py
import grpc
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
import calculator_pb2
import calculator_pb2_grpc
def run_operation(stub, operation_name, a, b):
"""
Execute a calculator operation.
Args:
stub: gRPC stub
operation_name: Name of operation (Add, Subtract, etc.)
a: First operand
b: Second operand
"""
# TODO: Get the operation method from stub using getattr()
# TODO: Create BinaryOperation request
# TODO: Call the operation and handle response
# TODO: Handle grpc.RpcError exceptions
pass
def main():
"""Main client function."""
# TODO: Create insecure channel to localhost:50051
# TODO: Create Calculator stub
# TODO: Test all operations (Add, Subtract, Multiply, Divide)
# TODO: Test error case (divide by zero)
pass
if __name__ == '__main__':
main()
Part 2: Retry Logic
client_with_retry.py
import grpc
import time
import random
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
import calculator_pb2
import calculator_pb2_grpc
def call_with_retry(stub, operation, request, max_retries=3, timeout=2.0):
"""
Call RPC with exponential backoff retry logic.
Args:
stub: gRPC stub
operation: RPC method to call
request: Request message
max_retries: Maximum retry attempts
timeout: Request timeout in seconds
Returns:
Response from successful call
Raises:
grpc.RpcError: If all retries fail
"""
# TODO: Loop through retry attempts
# TODO: Try to call operation with timeout
# TODO: On grpc.RpcError, check if it's DEADLINE_EXCEEDED
# TODO: Calculate exponential backoff: (2 ** attempt)
# TODO: Add jitter: random.uniform(0, 1)
# TODO: Sleep for backoff duration before retry
# TODO: If not DEADLINE_EXCEEDED, raise immediately (don't retry)
pass
def main():
"""Main client with retry logic."""
with grpc.insecure_channel('localhost:50051') as channel:
stub = calculator_pb2_grpc.CalculatorStub(channel)
try:
response = call_with_retry(
stub,
stub.Add,
calculator_pb2.BinaryOperation(a=10, b=5),
max_retries=3
)
print(f"Result: {response.value}")
except grpc.RpcError as e:
print(f"Operation failed: {e.details()}")
if __name__ == '__main__':
main()
server_with_failures.py
import grpc
from concurrent import futures
import random
import time
import argparse
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
import calculator_pb2
import calculator_pb2_grpc
class CalculatorServicer(calculator_pb2_grpc.CalculatorServicer):
"""Calculator with simulated failures."""
def __init__(self, failure_rate=0.3, slow_rate=0.2):
"""
Args:
failure_rate: Probability of crashing (0.0-1.0)
slow_rate: Probability of slow response (0.0-1.0)
"""
self.failure_rate = failure_rate
self.slow_rate = slow_rate
self.request_count = 0
def Add(self, request, context):
"""Add with simulated failures."""
# TODO: Implement similar to Part 1, but call _execute_with_failures
pass
def _execute_with_failures(self, request, context, operation):
"""Execute operation with failure simulation."""
self.request_count += 1
# TODO: Use random.random() to check against failure_rate
# If should fail: context.abort(grpc.StatusCode.UNAVAILABLE, message)
# TODO: Use random.random() to check against slow_rate
# If should be slow: time.sleep(3.0)
# TODO: Execute operation() and return Result
pass
def serve(failure_rate, slow_rate):
"""Start server with failure simulation."""
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
calculator_pb2_grpc.add_CalculatorServicer_to_server(
CalculatorServicer(failure_rate, slow_rate), server
)
server.add_insecure_port('[::]:50051')
print(f"Server starting with failure_rate={failure_rate}, slow_rate={slow_rate}")
server.start()
try:
server.wait_for_termination()
except KeyboardInterrupt:
server.stop(grace=5)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--failure-rate', type=float, default=0.3,
help='Probability of crashes (0.0-1.0)')
parser.add_argument('--slow-rate', type=float, default=0.2,
help='Probability of slow responses (0.0-1.0)')
args = parser.parse_args()
serve(args.failure_rate, args.slow_rate)
Part 3: Idempotency
Extended Protocol Buffer
// Add to calculator.proto
service Calculator {
// ... existing operations ...
rpc IncrementCounter(CounterRequest) returns (CounterResult);
rpc GetCounter(Empty) returns (CounterResult);
rpc ResetCounter(Empty) returns (CounterResult);
}
message CounterRequest {
string request_id = 1;
int32 increment = 2;
}
message CounterResult {
int32 value = 1;
bool was_duplicate = 2;
}
message Empty {}
server_with_idempotency.py
import uuid
from collections import OrderedDict
import time
class IdempotentCalculatorServicer(calculator_pb2_grpc.CalculatorServicer):
"""Calculator with idempotent operations."""
def __init__(self, cache_size=1000, cache_ttl=300):
"""
Args:
cache_size: Maximum cached requests
cache_ttl: Cache time-to-live (seconds)
"""
# TODO: Initialize counter to 0
# TODO: Initialize request_cache (use OrderedDict for LRU)
# TODO: Store cache_size and cache_ttl
pass
def IncrementCounter(self, request, context):
"""Idempotent counter increment."""
# TODO: Check if request.request_id is in cache
# TODO: If cached and not expired, return cached result with was_duplicate=True
# TODO: Otherwise, increment counter
# TODO: Cache the result with timestamp
# TODO: Return CounterResult with was_duplicate=False
pass
def GetCounter(self, request, context):
"""Get current counter value."""
# TODO: Return current counter value
pass
def ResetCounter(self, request, context):
"""Reset counter to zero."""
# TODO: Reset counter to 0
# TODO: Clear request cache
pass
def _cache_request(self, request_id, result):
"""Cache a processed request."""
# TODO: Check if cache is full (>= cache_size)
# TODO: If full, remove oldest entry (popitem(last=False))
# TODO: Add new entry with (result, current_timestamp)
client_with_idempotency.py
import uuid
import time
def increment_with_retry(stub, increment=1, max_retries=3):
"""
Idempotent increment with retry.
Args:
stub: gRPC stub
increment: Amount to increment
max_retries: Maximum retry attempts
"""
# TODO: Generate request_id ONCE using uuid.uuid4()
# IMPORTANT: Use same request_id for all retry attempts!
# TODO: Loop through retry attempts
# TODO: Call stub.IncrementCounter with request_id and increment
# TODO: Check response.was_duplicate to see if it was a retry
# TODO: On timeout, retry with SAME request_id
# TODO: Use exponential backoff from Part 2
pass
Part 4: Circuit Breaker
circuit_breaker.py
from enum import Enum
import time
import threading
class CircuitState(Enum):
"""Circuit breaker states."""
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreakerOpen(Exception):
"""Raised when circuit breaker is open."""
pass
class CircuitBreaker:
"""Circuit breaker implementation."""
def __init__(self, failure_threshold=3, timeout=5.0):
"""
Args:
failure_threshold: Failures before opening
timeout: Seconds before attempting reset
"""
# TODO: Initialize failure_threshold and timeout
# TODO: Initialize failure_count = 0
# TODO: Initialize last_failure_time = None
# TODO: Initialize state = CircuitState.CLOSED
# TODO: Initialize threading.Lock() for thread safety
pass
def call(self, func, *args, **kwargs):
"""Execute function through circuit breaker."""
# TODO: Use lock to check state
# TODO: If OPEN, check if should_attempt_reset()
# - If yes, set state to HALF_OPEN
# - If no, raise CircuitBreakerOpen
# TODO: Try to execute func(*args, **kwargs)
# TODO: On success, call _on_success()
# TODO: On exception, call _on_failure() then re-raise
pass
def _should_attempt_reset(self):
"""Check if enough time passed for reset."""
# TODO: Check if last_failure_time is None
# TODO: Check if (current_time - last_failure_time) >= timeout
pass
def _on_success(self):
"""Handle successful call."""
# TODO: Reset failure_count to 0
# TODO: If state is HALF_OPEN, change to CLOSED
pass
def _on_failure(self):
"""Handle failed call."""
# TODO: Increment failure_count
# TODO: Update last_failure_time to current time
# TODO: If state is HALF_OPEN, set to OPEN
# TODO: If failure_count >= threshold, set to OPEN
pass
@property
def current_state(self):
"""Get current circuit state."""
# TODO: Return current state (use lock for thread safety)
pass
resilient_client.py
class ResilientCalculatorClient:
"""Calculator client with circuit breaker."""
def __init__(self, address='localhost:50051'):
# TODO: Create gRPC channel
# TODO: Create stub
# TODO: Initialize CircuitBreaker with threshold=3, timeout=5.0
pass
def add(self, a, b):
"""Add with circuit breaker protection."""
# TODO: Define inner function _rpc_call() that calls stub.Add
# TODO: Call circuit_breaker.call(_rpc_call)
# TODO: Catch CircuitBreakerOpen and return None (fail fast)
# TODO: Catch grpc.RpcError and handle appropriately
pass
def close(self):
"""Close the channel."""
# TODO: Close the gRPC channel
pass
Makefile (Optional)
.PHONY: proto server client test clean
proto:
python -m grpc_tools.protoc \
-I./proto \
--python_out=./generated \
--grpc_python_out=./generated \
./proto/calculator.proto
server:
python server.py
client:
python client.py
test:
python -m pytest tests/
clean:
rm -rf generated/
find . -type d -name __pycache__ -exec rm -rf {} +
find . -type f -name '*.pyc' -delete
Testing Examples
See Testing Guide for comprehensive test examples.