Skip to content

gRPC Setup Guide

Understanding gRPC Components

Protocol Buffers (protobuf)

Protocol Buffers are Google's language-neutral, platform-neutral mechanism for serializing structured data.

Key concepts:

  • .proto files - Define your service interface and data structures
  • Messages - Data structures (like classes/structs)
  • Services - RPC endpoints (like REST routes)
  • Field numbers - Unique identifiers for fields (1, 2, 3...)

Code Generation

gRPC generates two Python files from your .proto file:

  1. *_pb2.py - Contains message classes
  2. *_pb2_grpc.py - Contains service client (stub) and server classes

Detailed Configuration

Server Configuration

Thread Pool Settings

server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    options=[
        ('grpc.max_send_message_length', 50 * 1024 * 1024),  # 50 MB
        ('grpc.max_receive_message_length', 50 * 1024 * 1024),
    ]
)

max_workers: Number of threads for handling requests concurrently.

Port Configuration

# IPv4 only
server.add_insecure_port('127.0.0.1:50051')

# IPv4 and IPv6
server.add_insecure_port('[::]:50051')

# Specific interface
server.add_insecure_port('0.0.0.0:50051')

Graceful Shutdown

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    # ... add servicer ...
    server.add_insecure_port('[::]:50051')
    server.start()

    try:
        server.wait_for_termination()
    except KeyboardInterrupt:
        print("\nShutting down gracefully...")
        server.stop(grace=5)  # 5 second grace period

Client Configuration

Channel Options

channel = grpc.insecure_channel(
    'localhost:50051',
    options=[
        ('grpc.max_send_message_length', 50 * 1024 * 1024),
        ('grpc.max_receive_message_length', 50 * 1024 * 1024),
        ('grpc.keepalive_time_ms', 10000),
        ('grpc.keepalive_timeout_ms', 5000),
    ]
)

Timeout Configuration

Per-call timeout:

response = stub.Add(
    request,
    timeout=2.0  # 2 seconds
)

Default timeout for all calls:

stub = calculator_pb2_grpc.CalculatorStub(channel)
stub = stub.with_call(timeout=2.0)  # Default 2s timeout

Error Handling

Status Codes

Common gRPC status codes:

Code Meaning When to Use
OK Success Default for successful operations
INVALID_ARGUMENT Bad input Division by zero, invalid parameters
NOT_FOUND Resource missing Key doesn't exist
ALREADY_EXISTS Duplicate Creating existing resource
DEADLINE_EXCEEDED Timeout Operation took too long
UNAVAILABLE Service down Server unreachable
INTERNAL Server error Unexpected server-side errors

Setting Error Status (Server)

def Divide(self, request, context):
    if request.b == 0:
        context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
        context.set_details('Division by zero is not allowed')
        return calculator_pb2.Result()  # Empty result

    return calculator_pb2.Result(value=request.a / request.b)

Handling Errors (Client)

try:
    response = stub.Divide(calculator_pb2.BinaryOperation(a=10, b=0))
    print(f"Result: {response.value}")
except grpc.RpcError as e:
    print(f"Error: {e.code()}")
    print(f"Details: {e.details()}")

    if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
        print("Invalid input provided")
    elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
        print("Request timed out")

Logging and Debugging

Enable gRPC Logging

import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

# Or configure gRPC-specific logging
grpc_logger = logging.getLogger('grpc')
grpc_logger.setLevel(logging.DEBUG)

Environment Variables

# Enable verbose logging
export GRPC_VERBOSITY=DEBUG
export GRPC_TRACE=all

# Run your application
python server.py

Interceptors for Logging

class LoggingInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        print(f"Request: {handler_call_details.method}")
        return continuation(handler_call_details)

server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    interceptors=[LoggingInterceptor()]
)

Testing with grpcurl

grpcurl is a command-line tool for interacting with gRPC servers.

Installation

# macOS
brew install grpcurl

# Linux
wget https://github.com/fullstorydev/grpcurl/releases/download/v1.8.9/grpcurl_1.8.9_linux_x86_64.tar.gz
tar -xvf grpcurl_1.8.9_linux_x86_64.tar.gz
sudo mv grpcurl /usr/local/bin/

Usage

List services:

grpcurl -plaintext localhost:50051 list

Describe a service:

grpcurl -plaintext localhost:50051 describe calculator.Calculator

Make an RPC call:

grpcurl -plaintext -d '{"a": 10, "b": 5}' \
  localhost:50051 calculator.Calculator/Add


Performance Tuning

Connection Reuse

# Bad - creates new channel per request
def bad_client():
    for i in range(1000):
        channel = grpc.insecure_channel('localhost:50051')
        stub = calculator_pb2_grpc.CalculatorStub(channel)
        response = stub.Add(calculator_pb2.BinaryOperation(a=i, b=1))
        channel.close()

# Good - reuses channel
def good_client():
    channel = grpc.insecure_channel('localhost:50051')
    stub = calculator_pb2_grpc.CalculatorStub(channel)
    for i in range(1000):
        response = stub.Add(calculator_pb2.BinaryOperation(a=i, b=1))
    channel.close()

Async/Await (Python 3.7+)

For high-performance applications:

import grpc.aio

async def async_add(a, b):
    async with grpc.aio.insecure_channel('localhost:50051') as channel:
        stub = calculator_pb2_grpc.CalculatorStub(channel)
        response = await stub.Add(calculator_pb2.BinaryOperation(a=a, b=b))
        return response.value

# Run async function
import asyncio
result = asyncio.run(async_add(10, 5))

Security (Optional)

TLS/SSL Configuration

For production use, enable TLS:

Server:

with open('server.key', 'rb') as f:
    private_key = f.read()
with open('server.crt', 'rb') as f:
    certificate_chain = f.read()

server_credentials = grpc.ssl_server_credentials(
    ((private_key, certificate_chain),)
)
server.add_secure_port('[::]:50051', server_credentials)

Client:

with open('ca.crt', 'rb') as f:
    trusted_certs = f.read()

credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
channel = grpc.secure_channel('localhost:50051', credentials)


Common Patterns

Metadata (Headers)

Send custom headers with requests:

# Client
metadata = [
    ('authorization', 'Bearer token123'),
    ('request-id', str(uuid.uuid4())),
]
response = stub.Add(request, metadata=metadata)

# Server - read metadata
def Add(self, request, context):
    metadata = dict(context.invocation_metadata())
    auth_token = metadata.get('authorization')
    print(f"Auth: {auth_token}")
    # ... process request ...

Streaming (Advanced)

Server streaming example:

service Calculator {
  rpc Stream(StreamRequest) returns (stream Result);
}
# Server
def Stream(self, request, context):
    for i in range(10):
        yield calculator_pb2.Result(value=i)
        time.sleep(0.5)

# Client
for response in stub.Stream(request):
    print(f"Received: {response.value}")

Troubleshooting

Problem: "Address already in use"

Solution: Kill existing process:

# Find process using port 50051
lsof -ti:50051

# Kill it
kill -9 $(lsof -ti:50051)

Problem: Changes to .proto not reflected

Solution: Regenerate Python code:

python -m grpc_tools.protoc -I./proto --python_out=./generated --grpc_python_out=./generated ./proto/calculator.proto

Problem: Import errors

Solution: Fix Python path:

import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'generated'))


Best Practices

Always close channels when done
Reuse channels across multiple calls
Set reasonable timeouts (default is infinite!)
Handle all error codes explicitly
Use connection pooling for high-load applications
Enable TLS in production
Log request/response for debugging

Don't create new channel per request
Don't use infinite timeouts
Don't ignore error handling
Don't commit generated code to git


Next Steps