"""
Base HTTP Client - Shared Logic for Sync and Async Clients
This module contains BaseHTTPClient with shared validation, retry logic,
circuit breaker, statistics, and utilities used by both HTTPClient and
AsyncHTTPClient.
"""
from __future__ import annotations
import fnmatch
import logging
import time
from collections import deque
from typing import Any, Optional, TypeAlias, Union
from urllib.parse import quote
import httpx
logger = logging.getLogger("hfortix.http.base")
# Type alias for API responses
HTTPResponse: TypeAlias = dict[str, Any]
__all__ = ["BaseHTTPClient", "HTTPResponse"]
[docs]
class BaseHTTPClient:
"""
Base class for HTTP clients with shared logic.
Provides:
- Parameter validation
- URL building
- Retry statistics
- Circuit breaker state management
- Endpoint timeout configuration
- Path normalization and encoding
- Data sanitization
"""
[docs]
def __init__(
self,
url: str,
verify: bool = True,
vdom: Optional[str] = None,
max_retries: int = 3,
connect_timeout: float = 10.0,
read_timeout: float = 300.0,
circuit_breaker_threshold: int = 5,
circuit_breaker_timeout: float = 60.0,
max_connections: int = 100,
max_keepalive_connections: int = 20,
adaptive_retry: bool = False,
retry_strategy: str = "exponential",
retry_jitter: bool = False,
) -> None:
"""Initialize base HTTP client with shared configuration
Args:
adaptive_retry: Enable adaptive retry with backpressure detection
(default: False)
When enabled, monitors response times and adjusts
retry delays
based on FortiGate health signals.
retry_strategy: Retry backoff strategy - 'exponential' (default)
or 'linear'. Exponential: 1s, 2s, 4s, 8s, 16s, 30s.
Linear: 1s, 2s, 3s, 4s, 5s.
retry_jitter: Add random jitter (0-25% of delay) to retry delays
to prevent thundering herd problem when multiple
clients retry simultaneously (default: False).
"""
# Validate parameters
if not url:
raise ValueError("URL is required")
if max_retries < 0:
raise ValueError("max_retries must be >= 0")
if max_retries > 100:
raise ValueError("max_retries must be <= 100")
if connect_timeout <= 0:
raise ValueError("connect_timeout must be > 0")
if read_timeout <= 0:
raise ValueError("read_timeout must be > 0")
if circuit_breaker_threshold <= 0:
raise ValueError("circuit_breaker_threshold must be > 0")
if circuit_breaker_timeout <= 0:
raise ValueError("circuit_breaker_timeout must be > 0")
if max_connections <= 0:
raise ValueError("max_connections must be > 0")
if max_keepalive_connections < 0:
raise ValueError("max_keepalive_connections must be >= 0")
if retry_strategy not in ("exponential", "linear"):
raise ValueError(
"retry_strategy must be 'exponential' or 'linear'"
)
# Auto-adjust keepalive connections if needed (don't error)
# httpx and other libraries allow these to be independent, but we'll
# adjust
# to be safe while not blocking legitimate configurations
if max_keepalive_connections > max_connections:
logger.warning(
f"max_keepalive_connections ({max_keepalive_connections}) > "
f"max_connections ({max_connections}). "
f"Adjusting max_keepalive_connections to {max_connections}."
)
max_keepalive_connections = max_connections
# Store configuration
self._url = url.rstrip("/")
self._verify = verify
self._vdom = vdom
self._max_retries = max_retries
self._connect_timeout = connect_timeout
self._read_timeout = read_timeout
# Initialize retry statistics
self._retry_stats: dict[str, Any] = {
"total_retries": 0,
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"retry_by_reason": {},
"retry_by_endpoint": {},
"last_retry_time": None,
}
# Initialize circuit breaker state
self._circuit_breaker: dict[str, Any] = {
"consecutive_failures": 0,
"last_failure_time": None,
"state": "closed", # closed, open, half_open
"failure_threshold": circuit_breaker_threshold,
"timeout": circuit_breaker_timeout,
}
# Initialize per-endpoint timeout configuration
self._endpoint_timeouts: dict[str, httpx.Timeout] = {}
# Adaptive retry configuration
self._adaptive_retry = adaptive_retry
self._retry_strategy = retry_strategy
self._retry_jitter = retry_jitter
# endpoint -> deque of response times
self._response_times: dict[str, deque] = {}
# 500ms baseline
self._baseline_response_time = 0.5
# Endpoint is slow if 3x baseline
self._slowdown_multiplier = 3.0
# ========================================================================
# Shared Utility Methods
# ========================================================================
@staticmethod
def _sanitize_data(data: Optional[dict[str, Any]]) -> dict[str, Any]:
"""
Remove sensitive fields from data before logging (recursive)
Recursively sanitizes nested dictionaries and lists to prevent
logging sensitive information like passwords, tokens, keys, VDOMs, etc.
Args:
data: Data to sanitize (can be dict, list, or any value)
Returns:
Sanitized copy of data with sensitive values redacted
Examples:
>>> _sanitize_data({'password': 'secret123', 'name': 'test'})
{'password': '***REDACTED***', 'name': 'test'}
>>> _sanitize_data({'users': [{'name': 'admin', 'key': 'abc'}]})
{'users': [{'name': 'admin', 'key': '***REDACTED***'}]}
"""
if not data:
return {}
sensitive_keys = [
"password",
"passwd",
"secret",
"token",
"key",
"private-key",
"passphrase",
"psk",
"api_key",
"api-key",
"apikey",
"auth",
"authorization",
"vdom", # Virtual domain names can reveal customer/tenant info
]
def sanitize_recursive(obj: Any) -> Any:
"""Recursively sanitize nested structures"""
if isinstance(obj, dict):
result = {}
for k, v in obj.items():
if any(s in k.lower() for s in sensitive_keys):
result[k] = "***REDACTED***"
else:
result[k] = sanitize_recursive(v)
return result
elif isinstance(obj, list):
return [sanitize_recursive(item) for item in obj]
else:
return obj
return sanitize_recursive(data)
def _log_context(
self,
request_id: Optional[str] = None,
**extra_fields: Any,
) -> dict[str, Any]:
"""
Build consistent logging context for structured logging
Provides standard fields for all log events, ensuring consistency
across the codebase. Automatically includes vdom/adom for multi-tenant
environments.
Args:
request_id: Optional request ID for correlation
**extra_fields: Additional fields to include (endpoint, method,
status_code, duration_seconds, etc.)
Returns:
Dictionary with logging context ready for logger.info(extra=...)
Examples:
>>> ctx = self._log_context(request_id="abc123",
... endpoint="/api/v2/cmdb/firewall/policy",
... method="GET")
>>> logger.info("Request started", extra=ctx)
"""
context: dict[str, Any] = {}
# Add request_id if provided
if request_id:
context["request_id"] = request_id
# Add vdom for FortiOS multi-tenancy (if configured)
if self._vdom:
context["vdom"] = self._vdom
# Add adom for FortiManager/FortiAnalyzer (future support)
# This allows FortiManager/FortiAnalyzer clients to set _adom
# Using getattr to avoid type checker errors for optional attribute
adom = getattr(self, "_adom", None)
if adom:
context["adom"] = adom
# Add all extra fields
context.update(extra_fields)
return context
@staticmethod
def _normalize_path(path: str) -> str:
"""Normalize API path by removing leading slashes"""
if isinstance(path, str):
return path.lstrip("/")
return path
def _build_url(self, api_type: str, path: str) -> str:
"""Build complete API URL from components"""
path = self._normalize_path(path)
encoded_path = quote(str(path), safe="/%")
return f"{self._url}/api/v2/{api_type}/{encoded_path}"
# ========================================================================
# Statistics Methods
# ========================================================================
[docs]
def get_retry_stats(self) -> dict[str, Any]:
"""Get retry statistics"""
return self._retry_stats.copy()
[docs]
def get_circuit_breaker_state(self) -> dict[str, Any]:
"""Get current circuit breaker state"""
return self._circuit_breaker.copy()
def _record_retry(self, reason: str, endpoint: str) -> None:
"""Record retry attempt in statistics"""
self._retry_stats["total_retries"] += 1
self._retry_stats["last_retry_time"] = time.time()
# Track by reason
if reason not in self._retry_stats["retry_by_reason"]:
self._retry_stats["retry_by_reason"][reason] = 0
self._retry_stats["retry_by_reason"][reason] += 1
# Track by endpoint
if endpoint not in self._retry_stats["retry_by_endpoint"]:
self._retry_stats["retry_by_endpoint"][endpoint] = 0
self._retry_stats["retry_by_endpoint"][endpoint] += 1
# ========================================================================
# Endpoint Timeout Configuration
# ========================================================================
def _get_endpoint_timeout(self, endpoint: str) -> Optional[httpx.Timeout]:
"""Get custom timeout for specific endpoint if configured"""
for pattern, timeout in self._endpoint_timeouts.items():
if fnmatch.fnmatch(endpoint, pattern):
return timeout
return None
# ========================================================================
# Circuit Breaker Methods
# ========================================================================
def _check_circuit_breaker(self, endpoint: str) -> None:
"""Check circuit breaker state before making request"""
if self._circuit_breaker["state"] == "open":
elapsed = time.time() - (
self._circuit_breaker["last_failure_time"] or 0
)
if elapsed < self._circuit_breaker["timeout"]:
remaining = self._circuit_breaker["timeout"] - elapsed
logger.error(
"Circuit breaker is OPEN - service unavailable (retry in %.1fs)", # noqa: E501
remaining,
)
from hfortix_core.exceptions import CircuitBreakerOpenError
raise CircuitBreakerOpenError(
f"Circuit breaker is OPEN for {endpoint}. "
f"Service appears to be down. Retry in {remaining:.1f}s"
)
else:
self._circuit_breaker["state"] = "half_open"
logger.info("Circuit breaker transitioning to HALF_OPEN state")
def _record_circuit_breaker_success(self) -> None:
"""Record successful request in circuit breaker"""
if self._circuit_breaker["state"] == "half_open":
self._circuit_breaker["state"] = "closed"
self._circuit_breaker["consecutive_failures"] = 0
logger.info("Circuit breaker CLOSED after successful request")
elif self._circuit_breaker["state"] == "closed":
self._circuit_breaker["consecutive_failures"] = 0
def _record_circuit_breaker_failure(self, endpoint: str) -> None:
"""Record failed request in circuit breaker"""
self._circuit_breaker["consecutive_failures"] += 1
self._circuit_breaker["last_failure_time"] = time.time()
failures = self._circuit_breaker["consecutive_failures"]
threshold = self._circuit_breaker["failure_threshold"]
if failures >= threshold and self._circuit_breaker["state"] != "open":
self._circuit_breaker["state"] = "open"
logger.error(
(
"Circuit breaker OPENED after %d consecutive "
"failures for endpoint %s"
),
failures,
endpoint,
)
[docs]
def reset_circuit_breaker(self) -> None:
"""Reset circuit breaker to closed state"""
self._circuit_breaker["state"] = "closed"
self._circuit_breaker["consecutive_failures"] = 0
self._circuit_breaker["last_failure_time"] = None
logger.info("Circuit breaker manually reset to CLOSED state")
# ========================================================================
# Retry Logic
# ========================================================================
def _should_retry(
self, error: Exception, attempt: int, endpoint: str = ""
) -> bool:
"""Determine if a request should be retried"""
if attempt >= self._max_retries:
return False
# Don't retry SSL/certificate errors - these are permanent failures
# that won't resolve with retries
if isinstance(error, (httpx.ConnectError, httpx.NetworkError)):
error_msg = str(error).lower()
ssl_indicators = [
"certificate_verify_failed",
"ssl:",
"certificate",
"cert verification",
"handshake",
"certificate is not valid",
]
if any(indicator in error_msg for indicator in ssl_indicators):
logger.error(
"SSL/Certificate error (not retrying) for %s: %s",
endpoint,
error,
)
return False
# Retry on connection errors and timeouts
if isinstance(error, (httpx.ConnectError, httpx.NetworkError)):
self._record_retry("connection_error", endpoint)
logger.warning(
"Connection error on attempt %d/%d for %s: %s",
attempt + 1,
self._max_retries,
endpoint,
error,
)
return True
if isinstance(
error, (httpx.ReadTimeout, httpx.WriteTimeout, httpx.PoolTimeout)
):
self._record_retry("timeout", endpoint)
logger.warning(
"Timeout on attempt %d/%d for %s: %s",
attempt + 1,
self._max_retries,
endpoint,
error,
)
return True
# Retry on HTTP status errors (429, 500-504)
if isinstance(error, httpx.HTTPStatusError):
status = error.response.status_code
if status == 429: # Rate limit
self._record_retry("rate_limit", endpoint)
logger.warning(
"Rate limit hit on attempt %d/%d for %s",
attempt + 1,
self._max_retries,
endpoint,
)
return True
elif 500 <= status <= 504: # Server errors
self._record_retry("server_error", endpoint)
logger.warning(
"Server error %d on attempt %d/%d for %s",
status,
attempt + 1,
self._max_retries,
endpoint,
)
return True
return False
def _get_retry_delay(
self,
attempt: int,
response: Optional[httpx.Response] = None,
endpoint: Optional[str] = None,
) -> float:
"""
Calculate retry delay with optional adaptive backpressure
Args:
attempt: Current retry attempt number (0-indexed)
response: HTTP response object (if available)
endpoint: Endpoint being retried (for adaptive logic)
Returns:
Delay in seconds before next retry
"""
# Check for Retry-After header (FortiGate explicitly telling us when to
# retry)
if response and "Retry-After" in response.headers:
try:
return float(response.headers["Retry-After"])
except ValueError:
pass
# Calculate base delay based on retry strategy
if self._retry_strategy == "exponential":
# Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s
delay = min(2**attempt, 30.0)
else: # linear
# Linear backoff: 1s, 2s, 3s, 4s, 5s, max 30s
delay = min((attempt + 1) * 1.0, 30.0)
# Apply adaptive backpressure if enabled
if self._adaptive_retry and endpoint:
delay = self._apply_adaptive_backpressure(
delay, response, endpoint
)
# Add jitter if enabled (0-25% random variation)
if self._retry_jitter:
import random
jitter_amount = delay * random.uniform(0, 0.25) # nosec B311
delay = delay + jitter_amount
logger.debug(
"Applied jitter to retry delay: %.2fs + %.2fs jitter = %.2fs",
delay - jitter_amount,
jitter_amount,
delay,
)
return delay
def _apply_adaptive_backpressure(
self,
base_delay: float,
response: Optional[httpx.Response],
endpoint: str,
) -> float:
"""
Apply adaptive backpressure based on FortiGate health signals
Args:
base_delay: Base exponential backoff delay
response: HTTP response (if available)
endpoint: Endpoint being retried
Returns:
Adjusted delay with backpressure multiplier applied
"""
multiplier = 1.0
# Signal 1: Explicit 503 Service Unavailable (FortiGate overloaded)
if response and response.status_code == 503:
multiplier = 3.0
logger.warning(
(
"FortiGate returned 503 (overloaded), applying 3x "
"backpressure multiplier"
)
)
# Signal 2: Endpoint showing slow response times (early warning)
elif self._is_endpoint_slow(endpoint):
multiplier = 2.0
avg_time = self._get_avg_response_time(endpoint)
logger.warning(
(
"Endpoint %s showing backpressure (avg response: "
"%.2fs, baseline: %.2fs), applying 2x multiplier"
),
endpoint,
avg_time,
self._baseline_response_time,
)
adjusted_delay = base_delay * multiplier
# Cap maximum delay at 2 minutes
return min(adjusted_delay, 120.0)
def _record_response_time(self, endpoint: str, duration: float) -> None:
"""
Record response time for adaptive backpressure detection
Args:
endpoint: API endpoint (e.g., 'cmdb/firewall/address')
duration: Response time in seconds
"""
if not self._adaptive_retry:
return # Zero overhead when disabled
if endpoint not in self._response_times:
# Keep last 100 response times per endpoint
self._response_times[endpoint] = deque(maxlen=100)
self._response_times[endpoint].append(duration)
def _get_avg_response_time(self, endpoint: str) -> float:
"""
Get average response time for endpoint
Args:
endpoint: API endpoint
Returns:
Average response time in seconds, or 0.0 if no data
"""
times = self._response_times.get(endpoint, deque())
if not times:
return 0.0
return sum(times) / len(times)
def _is_endpoint_slow(self, endpoint: str) -> bool:
"""
Detect if endpoint is responding slowly (backpressure signal)
Args:
endpoint: API endpoint to check
Returns:
True if endpoint average response time exceeds baseline threshold
"""
avg_time = self._get_avg_response_time(endpoint)
# No data yet, assume healthy
if avg_time == 0.0:
return False
# Slow if average > baseline * multiplier
threshold = self._baseline_response_time * self._slowdown_multiplier
return avg_time > threshold
[docs]
def get_health_metrics(self) -> dict[str, Any]:
"""
Get comprehensive health metrics including adaptive retry stats
Returns:
Dictionary with health score, response times, circuit state, etc.
"""
metrics: dict[str, Any] = {
"circuit_breaker": {
"state": self._circuit_breaker["state"],
"consecutive_failures": self._circuit_breaker[
"consecutive_failures"
],
"threshold": self._circuit_breaker["failure_threshold"],
},
"retry_stats": self._retry_stats.copy(),
"adaptive_retry_enabled": self._adaptive_retry,
}
# Add response time metrics if adaptive retry is enabled
if self._adaptive_retry and self._response_times:
metrics["response_times"] = {}
for endpoint, times in self._response_times.items():
if times:
sorted_times = sorted(times)
count = len(sorted_times)
metrics["response_times"][endpoint] = {
"count": count,
"avg_ms": round(sum(sorted_times) / count * 1000, 2),
"min_ms": round(min(sorted_times) * 1000, 2),
"max_ms": round(max(sorted_times) * 1000, 2),
"p50_ms": round(sorted_times[count // 2] * 1000, 2),
"p95_ms": (
round(sorted_times[int(count * 0.95)] * 1000, 2)
if count > 20
else None
),
"is_slow": self._is_endpoint_slow(endpoint),
}
return metrics
# ========================================================================
# Validation Helper Methods
# ========================================================================
@staticmethod
def _validate_api_type(api_type: str) -> None:
"""Validate API type parameter"""
valid_types = {"cmdb", "monitor", "log", "service"}
if api_type not in valid_types:
raise ValueError(
f"Invalid api_type '{api_type}'. Must be one of: "
f"{', '.join(sorted(valid_types))}"
)
@staticmethod
def _validate_path(path: str) -> None:
"""Validate path parameter"""
if not path or not isinstance(path, str):
raise ValueError("path must be a non-empty string")
@staticmethod
def _validate_data(data: Any) -> None:
"""Validate data parameter for POST/PUT"""
if not isinstance(data, dict):
raise TypeError(
f"data must be a dictionary, got {type(data).__name__}"
)
@staticmethod
def _validate_vdom(vdom: Optional[Union[str, bool]]) -> None:
"""Validate vdom parameter"""
if vdom is not None and not isinstance(vdom, (str, bool)):
raise TypeError(
f"vdom must be str, bool, or None, got {type(vdom).__name__}"
)
@staticmethod
def _validate_params(params: Optional[dict[str, Any]]) -> None:
"""Validate params parameter"""
if params is not None and not isinstance(params, dict):
raise TypeError(
f"params must be a dictionary or None, got "
f"{type(params).__name__}"
)