gRPC Setup Guide
Understanding gRPC Components
Protocol Buffers (protobuf)
Protocol Buffers are Google's language-neutral, platform-neutral mechanism for serializing structured data.
Key concepts:
.protofiles - Define your service interface and data structures- Messages - Data structures (like classes/structs)
- Services - RPC endpoints (like REST routes)
- Field numbers - Unique identifiers for fields (1, 2, 3...)
Code Generation
gRPC generates two Python files from your .proto file:
*_pb2.py- Contains message classes*_pb2_grpc.py- Contains service client (stub) and server classes
Detailed Configuration
Server Configuration
Thread Pool Settings
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
('grpc.max_send_message_length', 50 * 1024 * 1024), # 50 MB
('grpc.max_receive_message_length', 50 * 1024 * 1024),
]
)
max_workers: Number of threads for handling requests concurrently.
Port Configuration
# IPv4 only
server.add_insecure_port('127.0.0.1:50051')
# IPv4 and IPv6
server.add_insecure_port('[::]:50051')
# Specific interface
server.add_insecure_port('0.0.0.0:50051')
Graceful Shutdown
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
# ... add servicer ...
server.add_insecure_port('[::]:50051')
server.start()
try:
server.wait_for_termination()
except KeyboardInterrupt:
print("\nShutting down gracefully...")
server.stop(grace=5) # 5 second grace period
Client Configuration
Channel Options
channel = grpc.insecure_channel(
'localhost:50051',
options=[
('grpc.max_send_message_length', 50 * 1024 * 1024),
('grpc.max_receive_message_length', 50 * 1024 * 1024),
('grpc.keepalive_time_ms', 10000),
('grpc.keepalive_timeout_ms', 5000),
]
)
Timeout Configuration
Per-call timeout:
Default timeout for all calls:
stub = calculator_pb2_grpc.CalculatorStub(channel)
stub = stub.with_call(timeout=2.0) # Default 2s timeout
Error Handling
Status Codes
Common gRPC status codes:
| Code | Meaning | When to Use |
|---|---|---|
OK |
Success | Default for successful operations |
INVALID_ARGUMENT |
Bad input | Division by zero, invalid parameters |
NOT_FOUND |
Resource missing | Key doesn't exist |
ALREADY_EXISTS |
Duplicate | Creating existing resource |
DEADLINE_EXCEEDED |
Timeout | Operation took too long |
UNAVAILABLE |
Service down | Server unreachable |
INTERNAL |
Server error | Unexpected server-side errors |
Setting Error Status (Server)
def Divide(self, request, context):
if request.b == 0:
context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
context.set_details('Division by zero is not allowed')
return calculator_pb2.Result() # Empty result
return calculator_pb2.Result(value=request.a / request.b)
Handling Errors (Client)
try:
response = stub.Divide(calculator_pb2.BinaryOperation(a=10, b=0))
print(f"Result: {response.value}")
except grpc.RpcError as e:
print(f"Error: {e.code()}")
print(f"Details: {e.details()}")
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
print("Invalid input provided")
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
print("Request timed out")
Logging and Debugging
Enable gRPC Logging
import logging
# Enable debug logging
logging.basicConfig(level=logging.DEBUG)
# Or configure gRPC-specific logging
grpc_logger = logging.getLogger('grpc')
grpc_logger.setLevel(logging.DEBUG)
Environment Variables
# Enable verbose logging
export GRPC_VERBOSITY=DEBUG
export GRPC_TRACE=all
# Run your application
python server.py
Interceptors for Logging
class LoggingInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
print(f"Request: {handler_call_details.method}")
return continuation(handler_call_details)
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
interceptors=[LoggingInterceptor()]
)
Testing with grpcurl
grpcurl is a command-line tool for interacting with gRPC servers.
Installation
# macOS
brew install grpcurl
# Linux
wget https://github.com/fullstorydev/grpcurl/releases/download/v1.8.9/grpcurl_1.8.9_linux_x86_64.tar.gz
tar -xvf grpcurl_1.8.9_linux_x86_64.tar.gz
sudo mv grpcurl /usr/local/bin/
Usage
List services:
Describe a service:
Make an RPC call:
Performance Tuning
Connection Reuse
# Bad - creates new channel per request
def bad_client():
for i in range(1000):
channel = grpc.insecure_channel('localhost:50051')
stub = calculator_pb2_grpc.CalculatorStub(channel)
response = stub.Add(calculator_pb2.BinaryOperation(a=i, b=1))
channel.close()
# Good - reuses channel
def good_client():
channel = grpc.insecure_channel('localhost:50051')
stub = calculator_pb2_grpc.CalculatorStub(channel)
for i in range(1000):
response = stub.Add(calculator_pb2.BinaryOperation(a=i, b=1))
channel.close()
Async/Await (Python 3.7+)
For high-performance applications:
import grpc.aio
async def async_add(a, b):
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = calculator_pb2_grpc.CalculatorStub(channel)
response = await stub.Add(calculator_pb2.BinaryOperation(a=a, b=b))
return response.value
# Run async function
import asyncio
result = asyncio.run(async_add(10, 5))
Security (Optional)
TLS/SSL Configuration
For production use, enable TLS:
Server:
with open('server.key', 'rb') as f:
private_key = f.read()
with open('server.crt', 'rb') as f:
certificate_chain = f.read()
server_credentials = grpc.ssl_server_credentials(
((private_key, certificate_chain),)
)
server.add_secure_port('[::]:50051', server_credentials)
Client:
with open('ca.crt', 'rb') as f:
trusted_certs = f.read()
credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
channel = grpc.secure_channel('localhost:50051', credentials)
Common Patterns
Metadata (Headers)
Send custom headers with requests:
# Client
metadata = [
('authorization', 'Bearer token123'),
('request-id', str(uuid.uuid4())),
]
response = stub.Add(request, metadata=metadata)
# Server - read metadata
def Add(self, request, context):
metadata = dict(context.invocation_metadata())
auth_token = metadata.get('authorization')
print(f"Auth: {auth_token}")
# ... process request ...
Streaming (Advanced)
Server streaming example:
# Server
def Stream(self, request, context):
for i in range(10):
yield calculator_pb2.Result(value=i)
time.sleep(0.5)
# Client
for response in stub.Stream(request):
print(f"Received: {response.value}")
Troubleshooting
Problem: "Address already in use"
Solution: Kill existing process:
Problem: Changes to .proto not reflected
Solution: Regenerate Python code:
python -m grpc_tools.protoc -I./proto --python_out=./generated --grpc_python_out=./generated ./proto/calculator.proto
Problem: Import errors
Solution: Fix Python path:
Best Practices
✅ Always close channels when done
✅ Reuse channels across multiple calls
✅ Set reasonable timeouts (default is infinite!)
✅ Handle all error codes explicitly
✅ Use connection pooling for high-load applications
✅ Enable TLS in production
✅ Log request/response for debugging
❌ Don't create new channel per request
❌ Don't use infinite timeouts
❌ Don't ignore error handling
❌ Don't commit generated code to git
Next Steps
- Review Code Examples for complete implementations
- Check Testing Guide for testing strategies
- Start with Part 1: Basic RPC