Skip to content

Testing Guide

Overview

Comprehensive testing strategies for your RPC calculator assignment.


Testing Framework

Use pytest for automated testing:

pip install pytest pytest-asyncio

Project Structure

tests/
├── __init__.py
├── conftest.py              # Shared fixtures
├── test_part1_basic.py
├── test_part2_retry.py
├── test_part3_idempotency.py
└── test_part4_circuit_breaker.py

Part 1: Basic Operations

test_part1_basic.py

import pytest
import grpc
import sys
import os

sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'generated'))
import calculator_pb2
import calculator_pb2_grpc


@pytest.fixture
def stub():
    """Create gRPC stub for testing."""
    channel = grpc.insecure_channel('localhost:50051')
    stub = calculator_pb2_grpc.CalculatorStub(channel)
    yield stub
    channel.close()


class TestBasicOperations:
    """Test basic calculator operations."""

    def test_add(self, stub):
        """Test addition operation."""
        response = stub.Add(calculator_pb2.BinaryOperation(a=10, b=5))
        assert response.value == 15.0

    def test_subtract(self, stub):
        """Test subtraction operation."""
        response = stub.Subtract(calculator_pb2.BinaryOperation(a=10, b=5))
        assert response.value == 5.0

    def test_multiply(self, stub):
        """Test multiplication operation."""
        response = stub.Multiply(calculator_pb2.BinaryOperation(a=10, b=5))
        assert response.value == 50.0

    def test_divide(self, stub):
        """Test division operation."""
        response = stub.Divide(calculator_pb2.BinaryOperation(a=10, b=5))
        assert response.value == 2.0

    def test_divide_by_zero(self, stub):
        """Test division by zero error handling."""
        with pytest.raises(grpc.RpcError) as exc_info:
            stub.Divide(calculator_pb2.BinaryOperation(a=10, b=0))

        assert exc_info.value.code() == grpc.StatusCode.INVALID_ARGUMENT
        assert 'zero' in exc_info.value.details().lower()

    def test_negative_numbers(self, stub):
        """Test operations with negative numbers."""
        response = stub.Add(calculator_pb2.BinaryOperation(a=-10, b=5))
        assert response.value == -5.0

    def test_decimal_numbers(self, stub):
        """Test operations with decimal numbers."""
        response = stub.Multiply(calculator_pb2.BinaryOperation(a=2.5, b=4))
        assert response.value == 10.0

Part 2: Retry Logic

test_part2_retry.py

import pytest
import time
from unittest.mock import Mock, patch


class TestRetryLogic:
    """Test retry behavior."""

    def test_retry_on_timeout(self):
        """Test that timeouts trigger retries."""
        mock_stub = Mock()
        mock_stub.Add.side_effect = [
            grpc.RpcError(),  # First attempt fails
            grpc.RpcError(),  # Second attempt fails
            calculator_pb2.Result(value=15)  # Third attempt succeeds
        ]

        # Configure mock error
        for call in mock_stub.Add.side_effect[:-1]:
            call.code = lambda: grpc.StatusCode.DEADLINE_EXCEEDED

        result = call_with_retry(mock_stub, mock_stub.Add, request, max_retries=3)

        assert result.value == 15
        assert mock_stub.Add.call_count == 3

    def test_no_retry_on_invalid_argument(self):
        """Test that invalid arguments don't trigger retries."""
        mock_stub = Mock()
        error = grpc.RpcError()
        error.code = lambda: grpc.StatusCode.INVALID_ARGUMENT
        mock_stub.Add.side_effect = error

        with pytest.raises(grpc.RpcError):
            call_with_retry(mock_stub, mock_stub.Add, request, max_retries=3)

        # Should only try once
        assert mock_stub.Add.call_count == 1

    def test_exponential_backoff(self):
        """Test that backoff increases exponentially."""
        wait_times = []

        def mock_sleep(duration):
            wait_times.append(duration)

        with patch('time.sleep', mock_sleep):
            mock_stub = Mock()
            mock_stub.Add.side_effect = [
                grpc.RpcError(),
                grpc.RpcError(),
                calculator_pb2.Result(value=15)
            ]

            for call in mock_stub.Add.side_effect[:-1]:
                call.code = lambda: grpc.StatusCode.DEADLINE_EXCEEDED

            call_with_retry(mock_stub, mock_stub.Add, request)

        # Check backoff pattern (approximately 1s, 2s with jitter)
        assert 1.0 <= wait_times[0] <= 2.0
        assert 2.0 <= wait_times[1] <= 3.0

Part 3: Idempotency

test_part3_idempotency.py

import uuid
import pytest


class TestIdempotency:
    """Test idempotent operations."""

    @pytest.fixture
    def reset_counter(self, stub):
        """Reset counter before each test."""
        stub.ResetCounter(calculator_pb2.Empty())

    def test_normal_increment(self, stub, reset_counter):
        """Test normal counter increment."""
        request_id = str(uuid.uuid4())
        response = stub.IncrementCounter(
            calculator_pb2.CounterRequest(request_id=request_id, increment=1)
        )

        assert response.value == 1
        assert not response.was_duplicate

    def test_duplicate_detection(self, stub, reset_counter):
        """Test that duplicate requests are detected."""
        request_id = str(uuid.uuid4())

        # First request
        response1 = stub.IncrementCounter(
            calculator_pb2.CounterRequest(request_id=request_id, increment=5)
        )
        assert response1.value == 5
        assert not response1.was_duplicate

        # Duplicate request (same request_id)
        response2 = stub.IncrementCounter(
            calculator_pb2.CounterRequest(request_id=request_id, increment=5)
        )
        assert response2.value == 5  # Same value returned
        assert response2.was_duplicate  # Marked as duplicate

        # Counter should still be 5 (not 10)
        current = stub.GetCounter(calculator_pb2.Empty())
        assert current.value == 5

    def test_different_request_ids(self, stub, reset_counter):
        """Test that different request IDs increment separately."""
        request_id1 = str(uuid.uuid4())
        request_id2 = str(uuid.uuid4())

        response1 = stub.IncrementCounter(
            calculator_pb2.CounterRequest(request_id=request_id1, increment=3)
        )
        assert response1.value == 3

        response2 = stub.IncrementCounter(
            calculator_pb2.CounterRequest(request_id=request_id2, increment=2)
        )
        assert response2.value == 5  # 3 + 2

    def test_retry_scenario(self, stub, reset_counter):
        """Simulate retry scenario with same request_id."""
        request_id = str(uuid.uuid4())

        # Simulate 3 attempts with same request_id (as in retry)
        for i in range(3):
            response = stub.IncrementCounter(
                calculator_pb2.CounterRequest(request_id=request_id, increment=1)
            )

            if i == 0:
                assert not response.was_duplicate
            else:
                assert response.was_duplicate

            # Value should always be 1 (not 3)
            assert response.value == 1

Part 4: Circuit Breaker

test_part4_circuit_breaker.py

import pytest
import time
from circuit_breaker import CircuitBreaker, CircuitState, CircuitBreakerOpen


class TestCircuitBreaker:
    """Test circuit breaker behavior."""

    def test_closed_state_success(self):
        """Test that successful calls keep circuit closed."""
        breaker = CircuitBreaker(failure_threshold=3, timeout=5)

        def success():
            return "success"

        for _ in range(5):
            result = breaker.call(success)
            assert result == "success"

        assert breaker.current_state == CircuitState.CLOSED

    def test_opens_after_threshold(self):
        """Test that circuit opens after threshold failures."""
        breaker = CircuitBreaker(failure_threshold=3, timeout=5)

        def fail():
            raise Exception("failure")

        # First 2 failures - circuit stays closed
        for _ in range(2):
            with pytest.raises(Exception):
                breaker.call(fail)
            assert breaker.current_state == CircuitState.CLOSED

        # 3rd failure - circuit opens
        with pytest.raises(Exception):
            breaker.call(fail)
        assert breaker.current_state == CircuitState.OPEN

    def test_open_circuit_fails_fast(self):
        """Test that open circuit rejects calls immediately."""
        breaker = CircuitBreaker(failure_threshold=2, timeout=5)

        def fail():
            raise Exception("failure")

        # Trigger opening
        for _ in range(2):
            with pytest.raises(Exception):
                breaker.call(fail)

        assert breaker.current_state == CircuitState.OPEN

        # Next call should fail fast
        with pytest.raises(CircuitBreakerOpen):
            breaker.call(fail)

    def test_half_open_after_timeout(self):
        """Test transition to half-open after timeout."""
        breaker = CircuitBreaker(failure_threshold=2, timeout=1)

        def fail():
            raise Exception("failure")

        # Open circuit
        for _ in range(2):
            with pytest.raises(Exception):
                breaker.call(fail)

        assert breaker.current_state == CircuitState.OPEN

        # Wait for timeout
        time.sleep(1.1)

        # Next call should attempt (half-open)
        try:
            breaker.call(fail)
        except Exception:
            pass

        # State should have been half-open during call
        # (now back to open because call failed)
        assert breaker.current_state == CircuitState.OPEN

    def test_recovery_closes_circuit(self):
        """Test that successful recovery closes circuit."""
        breaker = CircuitBreaker(failure_threshold=2, timeout=1)

        call_count = [0]

        def sometimes_fail():
            call_count[0] += 1
            if call_count[0] <= 2:
                raise Exception("failure")
            return "success"

        # Open circuit
        for _ in range(2):
            with pytest.raises(Exception):
                breaker.call(sometimes_fail)

        assert breaker.current_state == CircuitState.OPEN

        # Wait and recover
        time.sleep(1.1)
        result = breaker.call(sometimes_fail)

        assert result == "success"
        assert breaker.current_state == CircuitState.CLOSED

Integration Tests

test_integration.py

import pytest
import subprocess
import time
import grpc


@pytest.fixture(scope="module")
def server_process():
    """Start server for integration tests."""
    process = subprocess.Popen(['python', 'server.py'])
    time.sleep(2)  # Wait for server to start
    yield process
    process.terminate()
    process.wait()


class TestIntegration:
    """Integration tests with real server."""

    def test_end_to_end(self, server_process):
        """Test complete flow with real server."""
        channel = grpc.insecure_channel('localhost:50051')
        stub = calculator_pb2_grpc.CalculatorStub(channel)

        # Basic operation
        response = stub.Add(calculator_pb2.BinaryOperation(a=10, b=5))
        assert response.value == 15.0

        # Error handling
        with pytest.raises(grpc.RpcError):
            stub.Divide(calculator_pb2.BinaryOperation(a=10, b=0))

        channel.close()

Load Testing

load_test.py

import time
import concurrent.futures
import statistics


def load_test(stub, num_requests=1000, concurrency=10):
    """
    Perform load test on calculator service.

    Args:
        stub: gRPC stub
        num_requests: Total requests to send
        concurrency: Number of concurrent clients

    Returns:
        dict with performance metrics
    """
    latencies = []
    errors = 0

    def make_request(i):
        start = time.perf_counter()
        try:
            stub.Add(
                calculator_pb2.BinaryOperation(a=i, b=1),
                timeout=2.0
            )
            latency = (time.perf_counter() - start) * 1000
            return latency, None
        except Exception as e:
            return None, e

    # Execute requests concurrently
    with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
        futures = [executor.submit(make_request, i) for i in range(num_requests)]

        for future in concurrent.futures.as_completed(futures):
            latency, error = future.result()
            if error:
                errors += 1
            else:
                latencies.append(latency)

    # Calculate statistics
    return {
        'total_requests': num_requests,
        'successful': len(latencies),
        'errors': errors,
        'mean_latency': statistics.mean(latencies),
        'median_latency': statistics.median(latencies),
        'p95_latency': statistics.quantiles(latencies, n=20)[18],
        'p99_latency': statistics.quantiles(latencies, n=100)[98],
        'min_latency': min(latencies),
        'max_latency': max(latencies),
    }


if __name__ == '__main__':
    channel = grpc.insecure_channel('localhost:50051')
    stub = calculator_pb2_grpc.CalculatorStub(channel)

    print("Running load test...")
    results = load_test(stub, num_requests=1000, concurrency=10)

    print("\n=== Load Test Results ===")
    for key, value in results.items():
        if 'latency' in key:
            print(f"{key}: {value:.2f} ms")
        else:
            print(f"{key}: {value}")

    channel.close()

Manual Testing Scripts

test_scenarios.sh

#!/bin/bash

echo "=== Test Scenario 1: Normal Operation ==="
python client.py
echo ""

echo "=== Test Scenario 2: Server with Failures ==="
python server_with_failures.py --failure-rate 0.3 &
SERVER_PID=$!
sleep 2
python client_with_retry.py
kill $SERVER_PID
echo ""

echo "=== Test Scenario 3: Idempotency ==="
python test_idempotency.py
echo ""

echo "=== Test Scenario 4: Circuit Breaker ==="
python demo_circuit_breaker.py

Test Coverage

Generate Coverage Report

# Install coverage tool
pip install pytest-cov

# Run tests with coverage
pytest --cov=. --cov-report=html

# View report
open htmlcov/index.html

Continuous Testing

GitHub Actions Workflow

# .github/workflows/test.yml
name: Test

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v2

    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: 3.8

    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install pytest pytest-cov

    - name: Generate protobuf
      run: make proto

    - name: Run tests
      run: pytest --cov

Best Practices

✅ Do's

  • Write tests before implementing features (TDD)
  • Test both success and failure cases
  • Use fixtures for common setup
  • Mock external dependencies
  • Test edge cases (negative numbers, zero, large values)
  • Measure test coverage
  • Run tests automatically in CI/CD

❌ Don'ts

  • Don't test implementation details
  • Don't write flaky tests (timing-dependent without proper waits)
  • Don't skip error cases
  • Don't commit commented-out tests
  • Don't rely on test execution order
  • Don't hardcode values that might change

Debugging Failed Tests

Enable Verbose Output

pytest -v -s

Run Specific Test

pytest tests/test_part1_basic.py::TestBasicOperations::test_add

Debug with pdb

def test_something(stub):
    import pdb; pdb.set_trace()
    response = stub.Add(...)

Check Server Logs

# Terminal 1: Server with debug logging
python server.py --log-level DEBUG

# Terminal 2: Run tests
pytest

Next Steps

  • Complete all test cases for each part
  • Achieve >80% code coverage
  • Run tests before submitting
  • Include test results in your report