Skip to content

Part 4: Circuit Breaker Pattern

Weight: 15%

Prevent cascading failures with the circuit breaker pattern.


Objectives

  • Implement circuit breaker pattern on the client
  • Prevent wasted resources during service outages
  • Understand fail-fast vs retry strategies

Background

The Problem: Cascading Failures

When a server becomes unavailable:

  • Without Circuit Breaker: Every client request waits for full timeout
  • 1000 clients × 2 second timeout = 2000 seconds of wasted time
  • Threads/connections blocked
  • Resources exhausted
  • Poor user experience

  • With Circuit Breaker: Fail fast after detecting pattern

  • Detect failure pattern (e.g., 3 consecutive failures)
  • Stop sending requests (circuit "open")
  • Periodically test if service recovered (circuit "half-open")
  • Resume normal operation when recovered (circuit "closed")

Circuit States

┌─────────┐ 3 failures  ┌──────┐ timeout (5s) ┌───────────┐
│ CLOSED  │─────────────>│ OPEN │─────────────>│ HALF_OPEN │
│         │              │      │              │           │
└─────────┘              └──────┘              └───────────┘
     ^                       |                       |
     |                       | (fail fast)           |
     └───────────────────────┴───────────────────────┘
               (success - reset counter)

Requirements

1. Circuit Breaker Implementation

Create a CircuitBreaker class:

from enum import Enum
import time
import threading

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreaker:
    def __init__(self, failure_threshold=3, timeout=5.0):
        """
        Args:
            failure_threshold: Number of failures before opening circuit
            timeout: Seconds to wait before trying half-open
        """
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self.lock = threading.Lock()

    def call(self, func, *args, **kwargs):
        """
        Execute function through circuit breaker.

        Args:
            func: Function to execute
            *args, **kwargs: Arguments to pass to func

        Returns:
            Result from func

        Raises:
            CircuitBreakerOpen: If circuit is open
        """
        with self.lock:
            if self.state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self.state = CircuitState.HALF_OPEN
                    print("Circuit HALF_OPEN - testing recovery")
                else:
                    raise CircuitBreakerOpen("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _should_attempt_reset(self):
        """Check if enough time has passed to try recovery."""
        if self.last_failure_time is None:
            return False
        return time.time() - self.last_failure_time >= self.timeout

    def _on_success(self):
        """Handle successful call."""
        with self.lock:
            self.failure_count = 0
            if self.state == CircuitState.HALF_OPEN:
                self.state = CircuitState.CLOSED
                print("Circuit CLOSED - service recovered")

    def _on_failure(self):
        """Handle failed call."""
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.state == CircuitState.HALF_OPEN:
                # Failed during recovery test
                self.state = CircuitState.OPEN
                print("Circuit OPEN - recovery test failed")
            elif self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
                print(f"Circuit OPEN - {self.failure_count} consecutive failures")

class CircuitBreakerOpen(Exception):
    """Raised when circuit breaker is open."""
    pass

2. Integrate with Client

Wrap RPC calls with circuit breaker:

class ResilientCalculatorClient:
    def __init__(self, address='localhost:50051'):
        self.channel = grpc.insecure_channel(address)
        self.stub = calculator_pb2_grpc.CalculatorStub(self.channel)
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=3,
            timeout=5.0
        )

    def add(self, a, b):
        """Add with circuit breaker protection."""
        def _rpc_call():
            return self.stub.Add(
                calculator_pb2.BinaryOperation(a=a, b=b),
                timeout=2.0
            )

        try:
            return self.circuit_breaker.call(_rpc_call)
        except CircuitBreakerOpen as e:
            print(f"Fast fail: {e}")
            return None
        except grpc.RpcError as e:
            print(f"RPC failed: {e.code()}")
            raise

Testing Requirements

Test Scenarios

Test Server State Expected Behavior
Normal Running normally All requests succeed, circuit stays CLOSED
Gradual Failure 1-2 failures, then success Circuit stays CLOSED, failure count resets
Circuit Opens 3 consecutive failures Circuit opens, subsequent requests fail fast
Recovery Test Wait 5s after opening Circuit goes HALF_OPEN, tests one request
Successful Recovery Service back online Circuit closes, normal operation resumes
Failed Recovery Still offline Circuit reopens, waits another 5s

Demo Script

def demo_circuit_breaker():
    """Demonstrate circuit breaker behavior."""
    client = ResilientCalculatorClient()

    print("=== Test 1: Normal Operation ===")
    for i in range(5):
        result = client.add(10, i)
        print(f"10 + {i} = {result.value if result else 'FAILED'}")

    print("\n=== Test 2: Server Goes Down ===")
    print("(Stop the server now)")
    time.sleep(3)

    for i in range(10):
        result = client.add(10, i)
        print(f"Attempt {i+1}: {result.value if result else 'CIRCUIT OPEN - FAIL FAST'}")
        time.sleep(1)

    print("\n=== Test 3: Server Recovers ===")
    print("(Restart the server now)")
    time.sleep(5)

    for i in range(5):
        result = client.add(10, i)
        print(f"10 + {i} = {result.value if result else 'FAILED'}")
        time.sleep(1)

Deliverables

📦 Code Files:

  • circuit_breaker.py - Circuit breaker implementation
  • resilient_client.py - Client with circuit breaker
  • demo_part4.py - Demonstration script

📊 Metrics Report:

Track and report:

  • Total requests attempted
  • Requests blocked by circuit breaker (fast fails)
  • Time saved by failing fast
  • Circuit state transitions

📹 Demo Video (2 minutes):

Show:

  1. Normal operation - all requests succeed
  2. Server goes down - circuit opens after 3 failures
  3. Subsequent requests fail fast (no waiting)
  4. Wait 5 seconds - circuit half-opens
  5. Server back - circuit closes, normal operation

Grading Rubric

Criterion Points Description
Circuit States 5 Correct CLOSED/OPEN/HALF_OPEN transitions
Failure Detection 3 Opens after threshold failures
Recovery Testing 3 Half-open state tests recovery
Thread Safety 2 Uses locks for concurrent access
Demo 2 Clear demonstration of all states
Total 15

Analysis Questions

Answer these in your report:

Question 1

Why does the circuit breaker "fail fast" instead of retrying when open?

Question 2

What's the purpose of the HALF_OPEN state? Why not go directly from OPEN to CLOSED?

Question 3

In a microservice architecture with Service A → B → C, where should circuit breakers be placed?

Question 4

How would you choose the failure_threshold and timeout values for a real system?


Tips

Testing Circuit Breaker

Use two terminal windows:

Terminal 1: Run server

python server.py

Terminal 2: Run client demo

python demo_part4.py

Stop/start server to trigger circuit state changes.

Thread Safety

Circuit breaker will be called from multiple threads. Use locks to protect shared state:

with self.lock:
    self.failure_count += 1

Real-World Libraries

Production systems use libraries like:

  • pybreaker (Python)
  • Hystrix (Java/Netflix)
  • Polly (.NET)

Your implementation teaches the core concepts!


Advanced Challenge (Optional)

Add these features for extra credit:

1. Metrics Dashboard

Track and display: - Request success/failure rate - Circuit state over time - Average response time

2. Per-Operation Circuit Breakers

Different operations might have different failure characteristics:

breakers = {
    'add': CircuitBreaker(threshold=3),
    'divide': CircuitBreaker(threshold=5),  # More tolerant
}

3. Graceful Degradation

Return cached/default values when circuit is open:

def add_with_fallback(a, b):
    try:
        return circuit_breaker.call(stub.Add, a, b)
    except CircuitBreakerOpen:
        return default_response()  # Fallback behavior


Next Steps

Congratulations! You've built a resilient RPC service with comprehensive failure handling.

For bonus points, compare your RPC implementation with REST in the Bonus Challenge.


Resources