Source code for hfortix_core.audit.formatters

"""
Audit Log Formatters

Provides different formatting options for audit logs to support
various compliance and logging standards.
"""

from __future__ import annotations

import json
from datetime import datetime, timezone
from typing import Any, Protocol, runtime_checkable

__all__ = [
    "AuditFormatter",
    "JSONFormatter",
    "SyslogFormatter",
    "CEFFormatter",
]


def _utc_timestamp() -> str:
    """Return an ISO 8601 UTC timestamp with a trailing Z."""
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")


[docs] @runtime_checkable class AuditFormatter(Protocol): """Protocol for audit log formatters"""
[docs] def format(self, operation: dict[str, Any]) -> str: """ Format an operation dict as a string Args: operation: Operation data dictionary Returns: Formatted string ready for output """ ...
[docs] class JSONFormatter: """ Format audit logs as JSON (default) Outputs compact JSON on a single line, suitable for log aggregation systems like ELK, Splunk, or cloud logging services. Example output: {"timestamp":"2026-01-02T14:23:45Z","method":"POST",...} """
[docs] def __init__(self, pretty: bool = False, indent: int = 2): """ Initialize JSON formatter Args: pretty: If True, format with indentation for readability indent: Number of spaces for indentation (when pretty=True) """ self.pretty = pretty self.indent = indent if pretty else None
[docs] def format(self, operation: dict[str, Any]) -> str: """Format as JSON string""" if self.pretty: return json.dumps(operation, indent=self.indent, sort_keys=True) return json.dumps(operation, separators=(",", ":"))
[docs] class SyslogFormatter: """ Format audit logs for Syslog (RFC 5424) Outputs Syslog-formatted messages with proper priority, timestamp, and structured data. Format: <PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID STRUCTURED-DATA MSG # noqa: E501 Example output: <134>1 2026-01-02T14:23:45Z 192.168.1.99 hfortix - - - {"method":"POST",...} # noqa: E501 """ # Syslog facilities FACILITIES = { "KERN": 0, "USER": 1, "MAIL": 2, "DAEMON": 3, "AUTH": 4, "SYSLOG": 5, "LPR": 6, "NEWS": 7, "UUCP": 8, "CRON": 9, "AUTHPRIV": 10, "FTP": 11, "LOCAL0": 16, "LOCAL1": 17, "LOCAL2": 18, "LOCAL3": 19, "LOCAL4": 20, "LOCAL5": 21, "LOCAL6": 22, "LOCAL7": 23, } # Syslog severities SEVERITIES = { "EMERG": 0, "ALERT": 1, "CRIT": 2, "ERR": 3, "WARNING": 4, "NOTICE": 5, "INFO": 6, "DEBUG": 7, }
[docs] def __init__( self, facility: str = "LOCAL0", severity: str = "INFO", app_name: str = "hfortix", hostname: str | None = None, ): """ Initialize Syslog formatter Args: facility: Syslog facility (LOCAL0-LOCAL7, USER, etc.) severity: Syslog severity (INFO, WARNING, ERR, etc.) app_name: Application name for syslog hostname: Hostname to use (None = use from operation data) """ self.facility = self.FACILITIES.get( facility.upper(), 16 ) # Default LOCAL0 self.severity = self.SEVERITIES.get( severity.upper(), 6 ) # Default INFO self.app_name = app_name self.hostname = hostname
[docs] def format(self, operation: dict[str, Any]) -> str: """ Format as RFC 5424 syslog message Priority is calculated as: (facility * 8) + severity """ # Calculate priority pri = (self.facility * 8) + self.severity # Get hostname from operation or use configured hostname = self.hostname or operation.get("host", "-") # Get timestamp (use operation timestamp or current time) timestamp = operation.get("timestamp", _utc_timestamp()) # Message is the full operation as JSON message = json.dumps(operation, separators=(",", ":")) # RFC 5424 format # <PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID STRUCTURED-DATA MSG # noqa: E501 return ( f"<{pri}>1 {timestamp} {hostname} {self.app_name} - - - {message}" )
[docs] class CEFFormatter: """ Format audit logs as Common Event Format (CEF) CEF is widely used by SIEM systems like ArcSight, Splunk, QRadar. Format: CEF:Version|Device Vendor|Device Product|Device Version|Signature ID|Name|Severity|Extension # noqa: E501 Example output: CEF:0|Fortinet|FortiGate|7.0|API_OPERATION|FortiGate API Operation|5| act=POST dst=192.168.1.99 suser=api-token outcome=success ... """ # CEF severity mapping SEVERITY_MAP = { "GET": 2, # Low - read operations "POST": 5, # Medium - create operations "PUT": 5, # Medium - update operations "DELETE": 7, # High - delete operations }
[docs] def __init__( self, device_vendor: str = "Fortinet", device_product: str = "FortiGate", device_version: str = "7.0", ): """ Initialize CEF formatter Args: device_vendor: Vendor name device_product: Product name device_version: Product version """ self.device_vendor = device_vendor self.device_product = device_product self.device_version = device_version
[docs] def format(self, operation: dict[str, Any]) -> str: """Format as CEF string""" method = operation.get("method", "UNKNOWN") # endpoint and success available for future use # endpoint = operation.get("endpoint", "") # success = operation.get("success", False) # CEF header severity = self.SEVERITY_MAP.get(method, 5) signature_id = "API_OPERATION" name = f"FortiGate API {method} Operation" header = ( f"CEF:0|{self.device_vendor}|{self.device_product}|" f"{self.device_version}|{signature_id}|{name}|{severity}|" ) # CEF extensions (key-value pairs) extensions = [] # Map operation fields to CEF fields if "method" in operation: extensions.append(f"act={self._escape(operation['method'])}") if "host" in operation: extensions.append(f"dst={self._escape(operation['host'])}") if "endpoint" in operation: extensions.append(f"request={self._escape(operation['endpoint'])}") if "success" in operation: outcome = "success" if operation["success"] else "failure" extensions.append(f"outcome={outcome}") if "status_code" in operation: extensions.append( f"requestClientApplication={operation['status_code']}" ) if "request_id" in operation: extensions.append( f"requestContext={self._escape(operation['request_id'])}" ) if "vdom" in operation and operation["vdom"]: extensions.append(f"dvchost={self._escape(operation['vdom'])}") if "duration_ms" in operation: extensions.append(f"rt={operation['duration_ms']}") if "object_type" in operation: extensions.append( f"deviceCustomString1={self._escape(operation['object_type'])}" ) extensions.append("deviceCustomString1Label=ObjectType") if "object_name" in operation and operation["object_name"]: extensions.append( f"deviceCustomString2={self._escape(operation['object_name'])}" ) extensions.append("deviceCustomString2Label=ObjectName") # Add username (from user_context if available) user = "api-token" user_context = operation.get("user_context") or {} if user_context.get("username"): user = user_context["username"] extensions.append(f"suser={self._escape(user)}") return header + " ".join(extensions)
@staticmethod def _escape(value: str) -> str: """Escape special characters for CEF format""" if not isinstance(value, str): value = str(value) # Escape backslash, pipe, equals, and newlines return ( value.replace("\\", "\\\\") .replace("|", "\\|") .replace("=", "\\=") .replace("\n", "\\n") .replace("\r", "\\r") )