Skip to content

Code Examples

Code scaffolding and guidance for implementing the assignment. These are NOT complete solutions - you must implement the logic yourself.


Part 1: Basic Calculator

calculator.proto

syntax = "proto3";

package calculator;

service Calculator {
  rpc Add(BinaryOperation) returns (Result);
  rpc Subtract(BinaryOperation) returns (Result);
  rpc Multiply(BinaryOperation) returns (Result);
  rpc Divide(BinaryOperation) returns (Result);
}

message BinaryOperation {
  double a = 1;
  double b = 2;
}

message Result {
  double value = 1;
  string error = 2;
}

server.py

import grpc
from concurrent import futures
import sys
import os

# Add generated code to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
import calculator_pb2
import calculator_pb2_grpc


class CalculatorServicer(calculator_pb2_grpc.CalculatorServicer):
    """Calculator service implementation."""

    def Add(self, request, context):
        """Add two numbers."""
        # TODO: Implement addition
        # Hint: Use request.a and request.b
        # Return calculator_pb2.Result(value=...)
        pass

    def Subtract(self, request, context):
        """Subtract two numbers."""
        # TODO: Implement subtraction
        pass

    def Multiply(self, request, context):
        """Multiply two numbers."""
        # TODO: Implement multiplication
        pass

    def Divide(self, request, context):
        """Divide two numbers."""
        # TODO: Check if request.b is zero
        # If zero: set error code and return empty Result
        # Otherwise: perform division and return result
        pass


def serve():
    """Start the gRPC server."""
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    calculator_pb2_grpc.add_CalculatorServicer_to_server(
        CalculatorServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    print("Server starting on port 50051...")
    server.start()

    try:
        server.wait_for_termination()
    except KeyboardInterrupt:
        print("\nShutting down gracefully...")
        server.stop(grace=5)


if __name__ == '__main__':
    serve()

client.py

import grpc
import sys
import os

sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
import calculator_pb2
import calculator_pb2_grpc


def run_operation(stub, operation_name, a, b):
    """
    Execute a calculator operation.

    Args:
        stub: gRPC stub
        operation_name: Name of operation (Add, Subtract, etc.)
        a: First operand
        b: Second operand
    """
    # TODO: Get the operation method from stub using getattr()
    # TODO: Create BinaryOperation request
    # TODO: Call the operation and handle response
    # TODO: Handle grpc.RpcError exceptions
    pass


def main():
    """Main client function."""
    # TODO: Create insecure channel to localhost:50051
    # TODO: Create Calculator stub
    # TODO: Test all operations (Add, Subtract, Multiply, Divide)
    # TODO: Test error case (divide by zero)
    pass


if __name__ == '__main__':
    main()

Part 2: Retry Logic

client_with_retry.py

import grpc
import time
import random
import sys
import os

sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
import calculator_pb2
import calculator_pb2_grpc


def call_with_retry(stub, operation, request, max_retries=3, timeout=2.0):
    """
    Call RPC with exponential backoff retry logic.

    Args:
        stub: gRPC stub
        operation: RPC method to call
        request: Request message
        max_retries: Maximum retry attempts
        timeout: Request timeout in seconds

    Returns:
        Response from successful call

    Raises:
        grpc.RpcError: If all retries fail
    """
    # TODO: Loop through retry attempts
    # TODO: Try to call operation with timeout
    # TODO: On grpc.RpcError, check if it's DEADLINE_EXCEEDED
    # TODO: Calculate exponential backoff: (2 ** attempt)
    # TODO: Add jitter: random.uniform(0, 1)
    # TODO: Sleep for backoff duration before retry
    # TODO: If not DEADLINE_EXCEEDED, raise immediately (don't retry)
    pass


def main():
    """Main client with retry logic."""
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = calculator_pb2_grpc.CalculatorStub(channel)

        try:
            response = call_with_retry(
                stub,
                stub.Add,
                calculator_pb2.BinaryOperation(a=10, b=5),
                max_retries=3
            )
            print(f"Result: {response.value}")
        except grpc.RpcError as e:
            print(f"Operation failed: {e.details()}")


if __name__ == '__main__':
    main()

server_with_failures.py

import grpc
from concurrent import futures
import random
import time
import argparse
import sys
import os

sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))
import calculator_pb2
import calculator_pb2_grpc


class CalculatorServicer(calculator_pb2_grpc.CalculatorServicer):
    """Calculator with simulated failures."""

    def __init__(self, failure_rate=0.3, slow_rate=0.2):
        """
        Args:
            failure_rate: Probability of crashing (0.0-1.0)
            slow_rate: Probability of slow response (0.0-1.0)
        """
        self.failure_rate = failure_rate
        self.slow_rate = slow_rate
        self.request_count = 0

    def Add(self, request, context):
        """Add with simulated failures."""
        # TODO: Implement similar to Part 1, but call _execute_with_failures
        pass

    def _execute_with_failures(self, request, context, operation):
        """Execute operation with failure simulation."""
        self.request_count += 1

        # TODO: Use random.random() to check against failure_rate
        # If should fail: context.abort(grpc.StatusCode.UNAVAILABLE, message)

        # TODO: Use random.random() to check against slow_rate
        # If should be slow: time.sleep(3.0)

        # TODO: Execute operation() and return Result
        pass


def serve(failure_rate, slow_rate):
    """Start server with failure simulation."""
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    calculator_pb2_grpc.add_CalculatorServicer_to_server(
        CalculatorServicer(failure_rate, slow_rate), server
    )
    server.add_insecure_port('[::]:50051')

    print(f"Server starting with failure_rate={failure_rate}, slow_rate={slow_rate}")
    server.start()

    try:
        server.wait_for_termination()
    except KeyboardInterrupt:
        server.stop(grace=5)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--failure-rate', type=float, default=0.3,
                        help='Probability of crashes (0.0-1.0)')
    parser.add_argument('--slow-rate', type=float, default=0.2,
                        help='Probability of slow responses (0.0-1.0)')
    args = parser.parse_args()

    serve(args.failure_rate, args.slow_rate)

Part 3: Idempotency

Extended Protocol Buffer

// Add to calculator.proto

service Calculator {
  // ... existing operations ...

  rpc IncrementCounter(CounterRequest) returns (CounterResult);
  rpc GetCounter(Empty) returns (CounterResult);
  rpc ResetCounter(Empty) returns (CounterResult);
}

message CounterRequest {
  string request_id = 1;
  int32 increment = 2;
}

message CounterResult {
  int32 value = 1;
  bool was_duplicate = 2;
}

message Empty {}

server_with_idempotency.py

import uuid
from collections import OrderedDict
import time


class IdempotentCalculatorServicer(calculator_pb2_grpc.CalculatorServicer):
    """Calculator with idempotent operations."""

    def __init__(self, cache_size=1000, cache_ttl=300):
        """
        Args:
            cache_size: Maximum cached requests
            cache_ttl: Cache time-to-live (seconds)
        """
        # TODO: Initialize counter to 0
        # TODO: Initialize request_cache (use OrderedDict for LRU)
        # TODO: Store cache_size and cache_ttl
        pass

    def IncrementCounter(self, request, context):
        """Idempotent counter increment."""
        # TODO: Check if request.request_id is in cache
        # TODO: If cached and not expired, return cached result with was_duplicate=True
        # TODO: Otherwise, increment counter
        # TODO: Cache the result with timestamp
        # TODO: Return CounterResult with was_duplicate=False
        pass

    def GetCounter(self, request, context):
        """Get current counter value."""
        # TODO: Return current counter value
        pass

    def ResetCounter(self, request, context):
        """Reset counter to zero."""
        # TODO: Reset counter to 0
        # TODO: Clear request cache
        pass

    def _cache_request(self, request_id, result):
        """Cache a processed request."""
        # TODO: Check if cache is full (>= cache_size)
        # TODO: If full, remove oldest entry (popitem(last=False))
        # TODO: Add new entry with (result, current_timestamp)

client_with_idempotency.py

import uuid
import time


def increment_with_retry(stub, increment=1, max_retries=3):
    """
    Idempotent increment with retry.

    Args:
        stub: gRPC stub
        increment: Amount to increment
        max_retries: Maximum retry attempts
    """
    # TODO: Generate request_id ONCE using uuid.uuid4()
    # IMPORTANT: Use same request_id for all retry attempts!

    # TODO: Loop through retry attempts
    # TODO: Call stub.IncrementCounter with request_id and increment
    # TODO: Check response.was_duplicate to see if it was a retry
    # TODO: On timeout, retry with SAME request_id
    # TODO: Use exponential backoff from Part 2
    pass

Part 4: Circuit Breaker

circuit_breaker.py

from enum import Enum
import time
import threading


class CircuitState(Enum):
    """Circuit breaker states."""
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"


class CircuitBreakerOpen(Exception):
    """Raised when circuit breaker is open."""
    pass


class CircuitBreaker:
    """Circuit breaker implementation."""

    def __init__(self, failure_threshold=3, timeout=5.0):
        """
        Args:
            failure_threshold: Failures before opening
            timeout: Seconds before attempting reset
        """
        # TODO: Initialize failure_threshold and timeout
        # TODO: Initialize failure_count = 0
        # TODO: Initialize last_failure_time = None
        # TODO: Initialize state = CircuitState.CLOSED
        # TODO: Initialize threading.Lock() for thread safety
        pass

    def call(self, func, *args, **kwargs):
        """Execute function through circuit breaker."""
        # TODO: Use lock to check state
        # TODO: If OPEN, check if should_attempt_reset()
        #       - If yes, set state to HALF_OPEN
        #       - If no, raise CircuitBreakerOpen

        # TODO: Try to execute func(*args, **kwargs)
        # TODO: On success, call _on_success()
        # TODO: On exception, call _on_failure() then re-raise
        pass

    def _should_attempt_reset(self):
        """Check if enough time passed for reset."""
        # TODO: Check if last_failure_time is None
        # TODO: Check if (current_time - last_failure_time) >= timeout
        pass

    def _on_success(self):
        """Handle successful call."""
        # TODO: Reset failure_count to 0
        # TODO: If state is HALF_OPEN, change to CLOSED
        pass

    def _on_failure(self):
        """Handle failed call."""
        # TODO: Increment failure_count
        # TODO: Update last_failure_time to current time
        # TODO: If state is HALF_OPEN, set to OPEN
        # TODO: If failure_count >= threshold, set to OPEN
        pass

    @property
    def current_state(self):
        """Get current circuit state."""
        # TODO: Return current state (use lock for thread safety)
        pass

resilient_client.py

class ResilientCalculatorClient:
    """Calculator client with circuit breaker."""

    def __init__(self, address='localhost:50051'):
        # TODO: Create gRPC channel
        # TODO: Create stub
        # TODO: Initialize CircuitBreaker with threshold=3, timeout=5.0
        pass

    def add(self, a, b):
        """Add with circuit breaker protection."""
        # TODO: Define inner function _rpc_call() that calls stub.Add
        # TODO: Call circuit_breaker.call(_rpc_call)
        # TODO: Catch CircuitBreakerOpen and return None (fail fast)
        # TODO: Catch grpc.RpcError and handle appropriately
        pass

    def close(self):
        """Close the channel."""
        # TODO: Close the gRPC channel
        pass

Makefile (Optional)

.PHONY: proto server client test clean

proto:
    python -m grpc_tools.protoc \
        -I./proto \
        --python_out=./generated \
        --grpc_python_out=./generated \
        ./proto/calculator.proto

server:
    python server.py

client:
    python client.py

test:
    python -m pytest tests/

clean:
    rm -rf generated/
    find . -type d -name __pycache__ -exec rm -rf {} +
    find . -type f -name '*.pyc' -delete

Testing Examples

See Testing Guide for comprehensive test examples.