import base64
import hashlib
import time
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.exceptions import InvalidSignature
def verify_webhook_signature(
public_key_pem: str,
url: str,
body: bytes,
signature_b64: str,
timestamp: str
) -> bool:
"""
Verify webhook signature to ensure request authenticity.
Args:
public_key_pem: PEM-formatted public key
url: Complete webhook URL
body: Raw request body (bytes)
signature_b64: Base64-encoded signature from X-Webhook-Signature header
timestamp: Timestamp string from X-Webhook-Timestamp header
Returns:
True if signature is valid, False otherwise
"""
# 1. Verify timestamp is recent (prevents replay attacks)
current_time = int(time.time())
request_time = int(timestamp)
if abs(current_time - request_time) > 300: # 5 minute window
print(f"Request timestamp {request_time} is outside acceptable range")
return False
# 2. Reconstruct the signed content
body_hash = hashlib.sha256(body).hexdigest()
signature_content = f"{timestamp}.{url}.{body_hash}".encode('utf-8')
# 3. Hash the content (this is what was actually signed)
content_hash = hashlib.sha256(signature_content).digest()
# 4. Load the public key
try:
public_key = serialization.load_pem_public_key(public_key_pem.encode())
except Exception as e:
print(f"Failed to load public key: {e}")
return False
# 5. Decode the signature
try:
signature = base64.b64decode(signature_b64)
except Exception as e:
print(f"Failed to decode signature: {e}")
return False
# 6. Verify the signature
try:
public_key.verify(
signature,
content_hash,
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except InvalidSignature:
print("Signature verification failed")
return False
# Flask/Django Example
def handle_webhook(request):
"""Example webhook handler with signature verification"""
# Extract headers
signature = request.headers.get('X-Webhook-Signature')
timestamp = request.headers.get('X-Webhook-Timestamp')
if not signature or not timestamp:
return 400, "Missing required headers"
# Get cached public key (implement your caching strategy)
public_key = get_cached_public_key()
# Verify the signature
is_valid = verify_webhook_signature(
public_key,
request.url,
request.body,
signature,
timestamp
)
if not is_valid:
return 401, "Invalid signature"
# Process your webhook logic here
webhook_data = request.json
process_webhook_event(webhook_data)
return 200, "OK"