Source code for hfortix_core.http.fmg_client

"""
FortiManager HTTP Client

HTTP client for FortiManager JSON-RPC API with session-based authentication.
Shares retry logic, circuit breaker, and connection pooling with HTTPClient.
"""

from __future__ import annotations

import logging
import time
from typing import Any, Literal, Optional

import httpx

from .base import BaseHTTPClient

logger = logging.getLogger("hfortix.http.fmg")

__all__ = ["HTTPClientFMG"]


[docs] class HTTPClientFMG(BaseHTTPClient): """ HTTP client for FortiManager JSON-RPC API. Provides session-based authentication and JSON-RPC request handling while reusing the retry logic, circuit breaker, connection pooling, and statistics from BaseHTTPClient. FortiManager uses a different authentication model than FortiOS: - FortiOS: REST API with Bearer token in headers - FortiManager: JSON-RPC API with session token in request body Example: >>> client = HTTPClientFMG( ... url="https://fmg.example.com", ... username="admin", ... password="password", ... ) >>> client.login() >>> response = client.execute("get", [{"url": "/dvmdb/device"}]) >>> client.logout() """
[docs] def __init__( self, url: str, username: str, password: str, verify: bool = True, adom: Optional[str] = None, max_retries: int = 3, connect_timeout: float = 10.0, read_timeout: float = 300.0, circuit_breaker_threshold: int = 5, circuit_breaker_timeout: float = 60.0, max_connections: int = 100, max_keepalive_connections: int = 20, adaptive_retry: bool = False, retry_strategy: str = "exponential", retry_jitter: bool = False, ) -> None: """ Initialize FortiManager HTTP client. Args: url: Base URL for FMG (e.g., "https://fmg.example.com") username: Admin username password: Admin password verify: Verify SSL certificates (default: True) adom: Default ADOM for operations max_retries: Maximum retry attempts on transient failures connect_timeout: Connection timeout in seconds read_timeout: Read timeout in seconds circuit_breaker_threshold: Failures before opening circuit circuit_breaker_timeout: Seconds before retrying after circuit opens max_connections: Maximum connection pool size max_keepalive_connections: Maximum keepalive connections adaptive_retry: Enable adaptive retry with backpressure detection retry_strategy: 'exponential' or 'linear' backoff retry_jitter: Add random jitter to retry delays """ super().__init__( url=url, verify=verify, vdom=None, # FMG uses ADOM, not VDOM max_retries=max_retries, connect_timeout=connect_timeout, read_timeout=read_timeout, circuit_breaker_threshold=circuit_breaker_threshold, circuit_breaker_timeout=circuit_breaker_timeout, max_connections=max_connections, max_keepalive_connections=max_keepalive_connections, adaptive_retry=adaptive_retry, retry_strategy=retry_strategy, retry_jitter=retry_jitter, ) self._username = username self._password = password self._adom = adom # For logging context self._session_token: str | None = None self._request_id: int = 0 # HTTP client with connection pooling self._http_client: httpx.Client | None = None self._max_connections = max_connections self._max_keepalive = max_keepalive_connections
@property def jsonrpc_url(self) -> str: """JSON-RPC endpoint URL.""" return f"{self._url}/jsonrpc" @property def is_authenticated(self) -> bool: """Check if we have a valid session.""" return self._session_token is not None @property def adom(self) -> str | None: """Default ADOM.""" return self._adom def _get_http_client(self) -> httpx.Client: """Get or create HTTP client with connection pooling.""" if self._http_client is None: limits = httpx.Limits( max_connections=self._max_connections, max_keepalive_connections=self._max_keepalive, ) timeout = httpx.Timeout( connect=self._connect_timeout, read=self._read_timeout, write=30.0, pool=10.0, ) self._http_client = httpx.Client( verify=self._verify, limits=limits, timeout=timeout, ) return self._http_client def _next_id(self) -> int: """Get next request ID.""" self._request_id += 1 return self._request_id
[docs] def login(self) -> dict[str, Any]: """ Authenticate with FortiManager. Returns: FMG login response dict with session and status information Raises: RuntimeError: If authentication fails """ if self._session_token: # Already logged in - return success status return { "result": [{"status": {"code": 0, "message": "Already authenticated"}}], "session": self._session_token } request = { "id": self._next_id(), "method": "exec", "params": [ { "url": "/sys/login/user", "data": { "user": self._username, "passwd": self._password, } } ], } logger.debug("Logging in to FortiManager at %s", self._url) client = self._get_http_client() response = client.post(self.jsonrpc_url, json=request) response.raise_for_status() data = response.json() # Check for successful login result = data.get("result", [{}])[0] status = result.get("status", {}) if status.get("code") != 0: error_msg = status.get("message", "Unknown error") logger.error("FMG login failed: %s", error_msg) raise RuntimeError(f"FMG login failed: {error_msg}") self._session_token = data.get("session") if not self._session_token: raise RuntimeError("FMG login succeeded but no session token received") logger.info("Successfully logged in to FortiManager") return data
[docs] def logout(self) -> dict[str, Any]: """ End FortiManager session. Returns: FMG logout response dict with status information """ if not self._session_token: return {"status": {"code": 0, "message": "Not logged in"}} try: request = { "id": self._next_id(), "method": "exec", "params": [{"url": "/sys/logout"}], "session": self._session_token, } client = self._get_http_client() response = client.post(self.jsonrpc_url, json=request) result = response.json() logger.debug("Logged out from FortiManager") return result except Exception as e: logger.debug("Logout error: %s", e) return {"status": {"code": -1, "message": str(e)}} finally: self._session_token = None
[docs] def execute( self, method: Literal["exec", "get", "set", "add", "update", "delete"], params: list[dict[str, Any]], verbose: int = 1, ) -> dict[str, Any]: """ Execute a FortiManager JSON-RPC request. Args: method: JSON-RPC method params: Request parameters verbose: Verbosity level (0 or 1) Returns: FMG response dict Raises: RuntimeError: If not authenticated or request fails """ if not self._session_token: self.login() endpoint = params[0].get("url", "/unknown") if params else "/unknown" # Check circuit breaker self._check_circuit_breaker(endpoint) request = { "id": self._next_id(), "method": method, "params": params, "session": self._session_token, "verbose": verbose, } start_time = time.perf_counter() attempt = 0 last_error: Exception | None = None while attempt <= self._max_retries: try: client = self._get_http_client() response = client.post(self.jsonrpc_url, json=request) response.raise_for_status() data = response.json() # Check for FMG-level errors result = data.get("result", [{}])[0] if data.get("result") else {} status = result.get("status", {}) if status.get("code") != 0: error_msg = status.get("message", "Unknown error") # Session expired - try to re-login if "session" in error_msg.lower() or status.get("code") == -11: self._session_token = None self.login() request["session"] = self._session_token continue raise RuntimeError(f"FMG request failed: {error_msg}") # Success duration = time.perf_counter() - start_time self._record_circuit_breaker_success() self._retry_stats["successful_requests"] += 1 self._retry_stats["total_requests"] += 1 logger.debug( "FMG request completed in %.3fs", duration, extra=self._log_context(endpoint=endpoint, duration_seconds=duration), ) return data except httpx.TimeoutException as e: last_error = e if self._should_retry(e, attempt, endpoint): attempt += 1 self._record_retry("timeout", endpoint) delay = self._calculate_retry_delay(attempt) logger.warning( "Request timeout, retrying in %.1fs (attempt %d/%d)", delay, attempt, self._max_retries + 1, ) time.sleep(delay) continue raise except httpx.HTTPStatusError as e: last_error = e if e.response.status_code >= 500 and self._should_retry(e, attempt, endpoint): attempt += 1 self._record_retry("server_error", endpoint) delay = self._calculate_retry_delay(attempt) logger.warning( "Server error %d, retrying in %.1fs (attempt %d/%d)", e.response.status_code, delay, attempt, self._max_retries + 1, ) time.sleep(delay) continue raise except Exception as e: last_error = e self._record_circuit_breaker_failure(endpoint) raise # All retries exhausted self._retry_stats["failed_requests"] += 1 self._retry_stats["total_requests"] += 1 self._record_circuit_breaker_failure(endpoint) if last_error: raise last_error raise RuntimeError("Request failed after all retries")
def _calculate_retry_delay(self, attempt: int) -> float: """Calculate retry delay based on strategy.""" if self._retry_strategy == "exponential": delay = min(2 ** (attempt - 1), 30.0) # Max 30 seconds else: # linear delay = min(attempt, 5.0) # Max 5 seconds if self._retry_jitter: import random jitter = random.uniform(0, delay * 0.25) delay += jitter return delay
[docs] def proxy_request( self, action: Literal["get", "post", "put", "delete"], resource: str, targets: list[str], payload: dict[str, Any] | None = None, timeout: int = 60, ) -> dict[str, Any]: """ Execute a FortiOS API call through the FMG proxy endpoint. This is the core method for routing FortiOS REST API calls through FortiManager to managed devices. Args: action: HTTP method (get, post, put, delete) resource: FortiOS API resource path (e.g., "/api/v2/cmdb/firewall/address") targets: List of target devices/groups (e.g., ["adom/root/device/fw-01"]) payload: Request body for POST/PUT timeout: Request timeout in seconds Returns: FMG response dict containing results from each target device """ data: dict[str, Any] = { "action": action, "resource": resource, "target": targets, "timeout": timeout, } if payload: data["payload"] = payload params = [ { "url": "/sys/proxy/json", "data": data, } ] return self.execute("exec", params)
[docs] def close(self) -> None: """Close the session and HTTP client.""" self.logout() if self._http_client: self._http_client.close() self._http_client = None
def __enter__(self) -> "HTTPClientFMG": """Context manager entry.""" self.login() return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Context manager exit.""" self.close() # ======================================================================== # Statistics and Health Methods (from BaseHTTPClient) # ========================================================================
[docs] def get_health_metrics(self) -> dict[str, Any]: """Get health metrics for monitoring.""" return { "authenticated": self.is_authenticated, "circuit_breaker": self.get_circuit_breaker_state(), "retry_stats": self.get_retry_stats(), "adom": self._adom, }
[docs] def get_connection_stats(self) -> dict[str, Any]: """Get connection pool statistics.""" stats: dict[str, Any] = { "max_connections": self._max_connections, "max_keepalive": self._max_keepalive, } if self._http_client: # httpx doesn't expose detailed pool stats, but we can track stats["client_active"] = True else: stats["client_active"] = False return stats