#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ProxMenux Security Manager Handles Proxmox firewall status, rules, and security tool detection. """ import os import json import subprocess import re import fcntl import threading from contextlib import contextmanager # ================================================================= # Proxmox Firewall Management # ================================================================= # Proxmox firewall config paths CLUSTER_FW = "/etc/pve/firewall/cluster.fw" HOST_FW_DIR = "/etc/pve/local" # host.fw is per-node @contextmanager def _exclusive_file_lock(path): """Hold an exclusive flock on `path` for the duration of the block. The read / modify / write pattern in `add_firewall_rule`, `edit_firewall_rule`, `delete_firewall_rule` and the jail.local writer was unsynchronised — two concurrent Flask threads doing add+add could each read the same content, modify in their own copy, and the second write would clobber the first. flock serialises across threads (and across processes) on the same path. Audit Tier 6 — security_manager locking ausente. """ parent = os.path.dirname(path) if parent: os.makedirs(parent, exist_ok=True) fd = os.open(path, os.O_RDWR | os.O_CREAT, 0o640) try: fcntl.flock(fd, fcntl.LOCK_EX) yield finally: try: fcntl.flock(fd, fcntl.LOCK_UN) except Exception: pass os.close(fd) # Threading lock for `_lynis_audit_running` flag and similar in-process # state. flock guards on-disk state; this guards in-memory state. _state_lock = threading.Lock() # Match a real pve-firewall rule line: ` ...` where DIR is # IN/OUT/GROUP and ACTION is ACCEPT/DROP/REJECT/. We don't # enforce the full grammar — just enough that comments, blank lines, and # random malformed text don't get counted as rules when computing # rule_index. PVE itself rejects malformed rules, so they exist on disk # but never appear in `pve-firewall list` output → keeping our internal # index in sync with that list means skipping them here too. _PVE_RULE_LINE_RE = re.compile( r'^(?:IN|OUT|GROUP)\s+\S+', re.IGNORECASE, ) def _is_pve_rule_line(stripped): if not stripped or stripped.startswith('#') or stripped.startswith('['): return False return bool(_PVE_RULE_LINE_RE.match(stripped)) # Allowed shape for inputs that flow into fail2ban-client argv or are written # as INI section headers in /etc/fail2ban/jail.local. Bounded length, conservative # alphabet, and forced to START with an alphanumeric so a name like `--help` # cannot be smuggled past argv as an option flag. Also prevents newline injection # (`jail_name='ssh\n[DEFAULT]\nbantime=1\n['` would corrupt the DEFAULT section) # and quote/escape tricks. See audit Tier 1 #12b. _JAIL_NAME_RE = re.compile(r'^[A-Za-z0-9_][A-Za-z0-9_-]{0,63}$') # Whitelist for the `level` argument to firewall functions. The audit flagged # that an unconstrained value here could one day be extended to `vm` and become # a path traversal sink. See audit Tier 1 #12d. _FIREWALL_LEVELS = ('host', 'cluster') # Whitelist of L4 protocols accepted by Proxmox `pve-firewall` rules. Anything # outside this set should be rejected to avoid silent acceptance of bogus rules. # See audit Tier 1 #12d. _FIREWALL_PROTOCOLS = ('tcp', 'udp', 'icmp', 'icmpv6', 'igmp', 'esp', 'ah', 'ipv6-icmp') def _is_valid_jail_name(name): """Return True iff `name` is a safe jail name for fail2ban-client / jail.local.""" return isinstance(name, str) and bool(_JAIL_NAME_RE.match(name)) # Source / dest values written into host.fw / cluster.fw rule lines. Allows # IPs (1.2.3.4), CIDR (1.2.3.0/24), IPv6 (::1, fe80::/64), Proxmox ipset # references (+ipsetname), and named aliases (alpha-numeric + dot/dash/underscore). # Rejects whitespace, `#`, and any control character (including the `\n` / # `\r` / `\t` that would otherwise let an attacker inject a fresh rule line. # See audit Tier 1 #12c. _FW_SOURCE_DEST_RE = re.compile(r'^[A-Za-z0-9.:/_+\-]{1,128}$') # Linux interface names: alphanumerics, dot, dash, underscore. Capped at 16 # chars (Linux IFNAMSIZ). Rejects newlines and shell metacharacters. _FW_IFACE_RE = re.compile(r'^[A-Za-z0-9_.\-]{1,16}$') def _is_valid_fw_endpoint(value): """True if `value` is empty (optional) or matches a safe firewall endpoint.""" if value == "" or value is None: return True return isinstance(value, str) and bool(_FW_SOURCE_DEST_RE.match(value)) def _is_valid_fw_iface(value): """True if `value` is empty (optional) or a valid network interface name.""" if value == "" or value is None: return True return isinstance(value, str) and bool(_FW_IFACE_RE.match(value)) def _run_cmd(cmd, timeout=10): """Run a shell command and return (returncode, stdout, stderr)""" try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) return result.returncode, result.stdout.strip(), result.stderr.strip() except subprocess.TimeoutExpired: return -1, "", "Command timed out" except FileNotFoundError: return -1, "", f"Command not found: {cmd[0]}" except Exception as e: return -1, "", str(e) def get_firewall_status(): """ Get the overall Proxmox firewall status. Returns dict with status info. """ result = { "pve_firewall_installed": False, "pve_firewall_active": False, "cluster_fw_enabled": False, "host_fw_enabled": False, "rules_count": 0, "rules": [], "monitor_port_open": False, } # Check if pve-firewall service exists rc, out, _ = _run_cmd(["systemctl", "is-active", "pve-firewall"]) result["pve_firewall_installed"] = rc == 0 or "inactive" in out or "active" in out result["pve_firewall_active"] = (rc == 0 and out == "active") # If not installed or inactive, check if the service unit exists if not result["pve_firewall_installed"]: rc2, _, _ = _run_cmd(["systemctl", "cat", "pve-firewall"]) result["pve_firewall_installed"] = rc2 == 0 # Parse cluster firewall config if os.path.isfile(CLUSTER_FW): try: with open(CLUSTER_FW, 'r') as f: content = f.read() # Check if firewall is enabled at cluster level for line in content.splitlines(): line = line.strip() if line.lower().startswith("enable:"): val = line.split(":", 1)[1].strip() result["cluster_fw_enabled"] = val == "1" break except Exception: pass # Parse host firewall config host_fw = os.path.join(HOST_FW_DIR, "host.fw") if os.path.isfile(host_fw): try: with open(host_fw, 'r') as f: content = f.read() for line in content.splitlines(): line = line.strip() if line.lower().startswith("enable:"): val = line.split(":", 1)[1].strip() result["host_fw_enabled"] = val == "1" break except Exception: pass # Get rules rules = _parse_firewall_rules() result["rules"] = rules result["rules_count"] = len(rules) # Check if port 8008 is allowed for rule in rules: dport = str(rule.get("dport", "")) if "8008" in dport and rule.get("action", "").upper() == "ACCEPT": result["monitor_port_open"] = True break return result def _parse_firewall_rules(): """Parse all firewall rules from cluster and host configs""" rules = [] rule_idx_by_file = {} # Track rule index per file for deletion for fw_file, source in [(CLUSTER_FW, "cluster"), (os.path.join(HOST_FW_DIR, "host.fw"), "host")]: if not os.path.isfile(fw_file): continue rule_idx_by_file[source] = 0 try: with open(fw_file, 'r') as f: content = f.read() in_rules = False section = "" for line in content.splitlines(): line = line.strip() if not line or line.startswith('#'): continue # Detect section headers if line.startswith('['): section_match = re.match(r'\[(\w+)\]', line) if section_match: section = section_match.group(1).upper() in_rules = section in ("RULES", "IN", "OUT") continue if in_rules or section in ("RULES", "IN", "OUT"): rule = _parse_rule_line(line, source, section) if rule: rule["rule_index"] = rule_idx_by_file[source] rules.append(rule) rule_idx_by_file[source] += 1 # else: malformed line — don't bump the index. The # delete/edit paths use the same `_is_pve_rule_line` # gate so this stays consistent across read and write. except Exception: pass return rules def _parse_rule_line(line, source, section): """Parse a single firewall rule line""" # Proxmox rule format: |ACTION MACRO(params) -option value ... # or: IN/OUT ACTION -p proto -dport port -source addr parts = line.split() if len(parts) < 2: return None rule = { "raw": line, "source_file": source, "section": section, } idx = 0 # Direction if parts[0].upper() in ("IN", "OUT"): rule["direction"] = parts[0].upper() idx = 1 elif section in ("IN",): rule["direction"] = "IN" elif section in ("OUT",): rule["direction"] = "OUT" if idx < len(parts): rule["action"] = parts[idx].upper() idx += 1 # Parse options while idx < len(parts): opt = parts[idx] if opt.startswith("-") and idx + 1 < len(parts): key = opt.lstrip("-") val = parts[idx + 1] rule[key] = val idx += 2 else: idx += 1 return rule def add_firewall_rule(direction="IN", action="ACCEPT", protocol="tcp", dport="", sport="", source="", dest="", iface="", comment="", level="host"): """ Add a custom firewall rule to host or cluster firewall config. Returns (success, message) """ # Validate inputs action = action.upper() if action not in ("ACCEPT", "DROP", "REJECT"): return False, f"Invalid action: {action}. Must be ACCEPT, DROP, or REJECT" direction = direction.upper() if direction not in ("IN", "OUT"): return False, f"Invalid direction: {direction}. Must be IN or OUT" if level not in _FIREWALL_LEVELS: return False, f"Invalid level: {level}. Must be one of {_FIREWALL_LEVELS}" # Per-field input hardening — rejects newline / `#` / shell metas which would # otherwise let a caller inject extra rule lines into host.fw / cluster.fw. # See audit Tier 1 #12c. if not _is_valid_fw_endpoint(source): return False, "Invalid source (only IP/CIDR/ipset/alias chars allowed)" if not _is_valid_fw_endpoint(dest): return False, "Invalid destination (only IP/CIDR/ipset/alias chars allowed)" if not _is_valid_fw_iface(iface): return False, "Invalid interface name" # Build rule line parts = [direction, action] if protocol: proto = protocol.lower() if proto not in _FIREWALL_PROTOCOLS: return False, f"Invalid protocol: {protocol}. Must be one of {_FIREWALL_PROTOCOLS}" parts.extend(["-p", proto]) if dport: # Validate port if not re.match(r'^[\d:,]+$', dport): return False, f"Invalid destination port: {dport}" parts.extend(["-dport", dport]) if sport: if not re.match(r'^[\d:,]+$', sport): return False, f"Invalid source port: {sport}" parts.extend(["-sport", sport]) if source: parts.extend(["-source", source]) if dest: parts.extend(["-dest", dest]) if iface: parts.extend(["-i", iface]) parts.extend(["-log", "nolog"]) if comment: # Sanitize comment. The previous regex used `\s` in the negation which # accepts `\n` / `\r` — letting a malicious comment terminate the rule # line and inject a fresh one. We use a literal space in the negation # so newlines / tabs are stripped. See audit Tier 1 #12c. safe_comment = re.sub(r'[^\w \-._/():]', '', comment) parts.append(f"# {safe_comment}") rule_line = " ".join(parts) # Determine target file if level == "cluster": fw_file = CLUSTER_FW else: fw_file = os.path.join(HOST_FW_DIR, "host.fw") try: with _exclusive_file_lock(fw_file): content = "" has_rules_section = False if os.path.isfile(fw_file): with open(fw_file, 'r') as f: content = f.read() has_rules_section = "[RULES]" in content if has_rules_section: lines = content.splitlines() new_lines = [] inserted = False for line in lines: new_lines.append(line) if not inserted and line.strip() == "[RULES]": new_lines.append(rule_line) inserted = True content = "\n".join(new_lines) + "\n" else: if content and not content.endswith("\n"): content += "\n" content += "\n[RULES]\n" content += rule_line + "\n" os.makedirs(os.path.dirname(fw_file), exist_ok=True) with open(fw_file, 'w') as f: f.write(content) _run_cmd(["pve-firewall", "reload"]) return True, f"Firewall rule added: {direction} {action} {protocol}{':' + dport if dport else ''}" except PermissionError: return False, "Permission denied. Cannot write to firewall config." except Exception as e: return False, f"Failed to add firewall rule: {str(e)}" def edit_firewall_rule(rule_index, level="host", direction="IN", action="ACCEPT", protocol="tcp", dport="", sport="", source="", dest="", iface="", comment=""): """ Edit an existing firewall rule by replacing it in-place. Deletes the old rule at rule_index and inserts the new one at the same position. Returns (success, message) """ # Validate inputs action = action.upper() if action not in ("ACCEPT", "DROP", "REJECT"): return False, f"Invalid action: {action}. Must be ACCEPT, DROP, or REJECT" direction = direction.upper() if direction not in ("IN", "OUT"): return False, f"Invalid direction: {direction}. Must be IN or OUT" if level not in _FIREWALL_LEVELS: return False, f"Invalid level: {level}. Must be one of {_FIREWALL_LEVELS}" # See add_firewall_rule for the same rationale — keep both entry points # consistent so they cannot be exploited via newline / shell-metachar # injection. Audit Tier 1 #12c. if not _is_valid_fw_endpoint(source): return False, "Invalid source (only IP/CIDR/ipset/alias chars allowed)" if not _is_valid_fw_endpoint(dest): return False, "Invalid destination (only IP/CIDR/ipset/alias chars allowed)" if not _is_valid_fw_iface(iface): return False, "Invalid interface name" # Build new rule line parts = [direction, action] if protocol: proto = protocol.lower() if proto not in _FIREWALL_PROTOCOLS: return False, f"Invalid protocol: {protocol}. Must be one of {_FIREWALL_PROTOCOLS}" parts.extend(["-p", proto]) if dport: if not re.match(r'^[\d:,]+$', dport): return False, f"Invalid destination port: {dport}" parts.extend(["-dport", dport]) if sport: if not re.match(r'^[\d:,]+$', sport): return False, f"Invalid source port: {sport}" parts.extend(["-sport", sport]) if source: parts.extend(["-source", source]) # `dest` was previously dropped silently from edit_firewall_rule — that's # the registered audit issue "edit_firewall_rule IGNORA dest". Honor it. if dest: parts.extend(["-dest", dest]) if iface: parts.extend(["-i", iface]) parts.extend(["-log", "nolog"]) if comment: # Same fix as add_firewall_rule: literal space, no `\s`, so newlines # cannot escape the comment and inject another rule. safe_comment = re.sub(r'[^\w \-._/():]', '', comment) parts.append(f"# {safe_comment}") new_rule_line = " ".join(parts) # Determine target file if level == "cluster": fw_file = CLUSTER_FW else: fw_file = os.path.join(HOST_FW_DIR, "host.fw") if not os.path.isfile(fw_file): return False, "Firewall config file not found" try: with _exclusive_file_lock(fw_file): with open(fw_file, 'r') as f: content = f.read() lines = content.splitlines() new_lines = [] in_rules = False current_rule_idx = 0 replaced = False for line in lines: stripped = line.strip() if stripped.startswith('['): section_match = re.match(r'\[(\w+)\]', stripped) if section_match: section = section_match.group(1).upper() in_rules = section in ("RULES", "IN", "OUT") # Only count lines that look like real PVE firewall rules # (` ...`). Random malformed lines that pve- # firewall would skip used to bump our index, which made # "delete rule N" hit the wrong rule. Audit Tier 6 — # delete/edit_firewall_rule desync de índices. if in_rules and stripped and _is_pve_rule_line(stripped): if current_rule_idx == rule_index: new_lines.append(new_rule_line) replaced = True current_rule_idx += 1 continue current_rule_idx += 1 new_lines.append(line) if not replaced: return False, f"Rule index {rule_index} not found" with open(fw_file, 'w') as f: f.write("\n".join(new_lines) + "\n") _run_cmd(["pve-firewall", "reload"]) return True, f"Firewall rule updated: {direction} {action} {protocol}{':' + dport if dport else ''}" except PermissionError: return False, "Permission denied. Cannot modify firewall config." except Exception as e: return False, f"Failed to edit rule: {str(e)}" def delete_firewall_rule(rule_index, level="host"): """ Delete a firewall rule by index from host or cluster config. The index corresponds to the order of rules in [RULES] section. Returns (success, message) """ if level not in _FIREWALL_LEVELS: return False, f"Invalid level: {level}. Must be one of {_FIREWALL_LEVELS}" if level == "cluster": fw_file = CLUSTER_FW else: fw_file = os.path.join(HOST_FW_DIR, "host.fw") if not os.path.isfile(fw_file): return False, "Firewall config file not found" try: with _exclusive_file_lock(fw_file): with open(fw_file, 'r') as f: content = f.read() lines = content.splitlines() new_lines = [] in_rules = False current_rule_idx = 0 removed_rule = None for line in lines: stripped = line.strip() if stripped.startswith('['): section_match = re.match(r'\[(\w+)\]', stripped) if section_match: section = section_match.group(1).upper() in_rules = section in ("RULES", "IN", "OUT") # Same rule-shape gate as edit_firewall_rule above — skip # malformed lines so the index stays aligned with the # rules pve-firewall actually reports. if in_rules and stripped and _is_pve_rule_line(stripped): if current_rule_idx == rule_index: removed_rule = stripped current_rule_idx += 1 continue # Skip this line (delete it) current_rule_idx += 1 new_lines.append(line) if removed_rule is None: return False, f"Rule index {rule_index} not found" with open(fw_file, 'w') as f: f.write("\n".join(new_lines) + "\n") _run_cmd(["pve-firewall", "reload"]) return True, f"Firewall rule deleted: {removed_rule}" except PermissionError: return False, "Permission denied. Cannot modify firewall config." except Exception as e: return False, f"Failed to delete rule: {str(e)}" def add_monitor_port_rule(): """ Add a firewall rule to allow port 8008 (ProxMenux Monitor) on the host. Returns (success, message) """ host_fw = os.path.join(HOST_FW_DIR, "host.fw") # Check if rule already exists status = get_firewall_status() if status.get("monitor_port_open"): return True, "Port 8008 is already allowed in the firewall" try: content = "" has_rules_section = False if os.path.isfile(host_fw): with open(host_fw, 'r') as f: content = f.read() has_rules_section = "[RULES]" in content rule_line = "IN ACCEPT -p tcp -dport 8008 -log nolog # ProxMenux Monitor" if has_rules_section: # Add rule after [RULES] section header lines = content.splitlines() new_lines = [] inserted = False for line in lines: new_lines.append(line) if not inserted and line.strip() == "[RULES]": new_lines.append(rule_line) inserted = True content = "\n".join(new_lines) + "\n" else: # Add [RULES] section if content and not content.endswith("\n"): content += "\n" content += "\n[RULES]\n" content += rule_line + "\n" with open(host_fw, 'w') as f: f.write(content) # Reload firewall _run_cmd(["pve-firewall", "reload"]) return True, "Firewall rule added: port 8008 (TCP) allowed for ProxMenux Monitor" except PermissionError: return False, "Permission denied. Cannot write to firewall config." except Exception as e: return False, f"Failed to add firewall rule: {str(e)}" def remove_monitor_port_rule(): """ Remove the ProxMenux Monitor port 8008 rule from host firewall. Returns (success, message) """ host_fw = os.path.join(HOST_FW_DIR, "host.fw") if not os.path.isfile(host_fw): return True, "No host firewall config found" try: with open(host_fw, 'r') as f: lines = f.readlines() new_lines = [] removed = False for line in lines: if "8008" in line and "ProxMenux" in line: removed = True continue new_lines.append(line) if not removed: return True, "No ProxMenux Monitor rule found to remove" with open(host_fw, 'w') as f: f.writelines(new_lines) _run_cmd(["pve-firewall", "reload"]) return True, "ProxMenux Monitor firewall rule removed" except Exception as e: return False, f"Failed to remove firewall rule: {str(e)}" def enable_firewall(level="host"): """ Enable the Proxmox firewall at host or cluster level. Returns (success, message) """ if level not in _FIREWALL_LEVELS: return False, f"Invalid level: {level}. Must be one of {_FIREWALL_LEVELS}" if level == "cluster": return _set_firewall_enabled(CLUSTER_FW, True) else: host_fw = os.path.join(HOST_FW_DIR, "host.fw") return _set_firewall_enabled(host_fw, True) def disable_firewall(level="host"): """ Disable the Proxmox firewall at host or cluster level. Returns (success, message) """ if level not in _FIREWALL_LEVELS: return False, f"Invalid level: {level}. Must be one of {_FIREWALL_LEVELS}" if level == "cluster": return _set_firewall_enabled(CLUSTER_FW, False) else: host_fw = os.path.join(HOST_FW_DIR, "host.fw") return _set_firewall_enabled(host_fw, False) def _set_firewall_enabled(fw_file, enabled): """Set enable: 1 or enable: 0 in firewall config""" try: content = "" if os.path.isfile(fw_file): with open(fw_file, 'r') as f: content = f.read() enable_val = "1" if enabled else "0" has_options = "[OPTIONS]" in content has_enable = False lines = content.splitlines() new_lines = [] in_options = False for line in lines: stripped = line.strip() if stripped.startswith("["): in_options = stripped == "[OPTIONS]" if in_options and stripped.lower().startswith("enable:"): new_lines.append(f"enable: {enable_val}") has_enable = True else: new_lines.append(line) if not has_enable: if has_options: # Add enable line after [OPTIONS] final_lines = [] for line in new_lines: final_lines.append(line) if line.strip() == "[OPTIONS]": final_lines.append(f"enable: {enable_val}") new_lines = final_lines else: # Add [OPTIONS] section at the beginning new_lines.insert(0, "[OPTIONS]") new_lines.insert(1, f"enable: {enable_val}") new_lines.insert(2, "") # Ensure parent directory exists os.makedirs(os.path.dirname(fw_file), exist_ok=True) with open(fw_file, 'w') as f: f.write("\n".join(new_lines) + "\n") # Reload or start the firewall service if enabled: _run_cmd(["systemctl", "enable", "pve-firewall"]) _run_cmd(["systemctl", "start", "pve-firewall"]) _run_cmd(["pve-firewall", "reload"]) state = "enabled" if enabled else "disabled" level = "cluster" if fw_file == CLUSTER_FW else "host" return True, f"Firewall {state} at {level} level" except PermissionError: return False, "Permission denied. Cannot modify firewall config." except Exception as e: return False, f"Failed to modify firewall: {str(e)}" # ================================================================= # Security Tools Detection # ================================================================= # ================================================================= # Fail2Ban Detailed Management # ================================================================= def get_fail2ban_details(): """ Get detailed Fail2Ban info: per-jail banned IPs, ban times, etc. Returns dict with detailed jail information. """ result = { "installed": False, "active": False, "version": "", "jails": [], } rc, out, _ = _run_cmd(["fail2ban-client", "--version"]) if rc != 0: return result result["installed"] = True result["version"] = out.split("\n")[0].strip() if out else "" rc2, out2, _ = _run_cmd(["systemctl", "is-active", "fail2ban"]) result["active"] = (rc2 == 0 and out2 == "active") if not result["active"]: return result # Get jail list rc3, out3, _ = _run_cmd(["fail2ban-client", "status"]) jail_names = [] if rc3 == 0: for line in out3.splitlines(): if "Jail list:" in line: jails_str = line.split(":", 1)[1].strip() jail_names = [j.strip() for j in jails_str.split(",") if j.strip()] # Get detailed info per jail for jail_name in jail_names: jail_info = { "name": jail_name, "currently_failed": 0, "total_failed": 0, "currently_banned": 0, "total_banned": 0, "banned_ips": [], "findtime": "", "bantime": "", "maxretry": "", } rc4, out4, _ = _run_cmd(["fail2ban-client", "status", jail_name]) if rc4 == 0: for line in out4.splitlines(): line = line.strip() if "Currently failed:" in line: try: jail_info["currently_failed"] = int(line.split(":", 1)[1].strip()) except ValueError: pass elif "Total failed:" in line: try: jail_info["total_failed"] = int(line.split(":", 1)[1].strip()) except ValueError: pass elif "Currently banned:" in line: try: jail_info["currently_banned"] = int(line.split(":", 1)[1].strip()) except ValueError: pass elif "Total banned:" in line: try: jail_info["total_banned"] = int(line.split(":", 1)[1].strip()) except ValueError: pass elif "Banned IP list:" in line: ips_str = line.split(":", 1)[1].strip() if ips_str: raw_ips = [ip.strip() for ip in ips_str.split() if ip.strip()] jail_info["banned_ips"] = [ {"ip": ip, "type": classify_ip(ip)} for ip in raw_ips ] # Get jail config values for key in ["findtime", "bantime", "maxretry"]: rc5, out5, _ = _run_cmd(["fail2ban-client", "get", jail_name, key]) if rc5 == 0 and out5: jail_info[key] = out5.strip() result["jails"].append(jail_info) return result def classify_ip(ip_address): """ Classify an IP address as 'local' or 'external'. Local: 10.x.x.x, 172.16-31.x.x, 192.168.x.x, 127.x.x.x, fd00::/8, fe80::/10, ::1 """ if not ip_address: return "unknown" ip = ip_address.strip() # IPv4 private ranges if ip.startswith("10.") or ip.startswith("127.") or ip.startswith("192.168."): return "local" if ip.startswith("172."): try: second_octet = int(ip.split(".")[1]) if 16 <= second_octet <= 31: return "local" except (ValueError, IndexError): pass # IPv6 private/link-local ip_lower = ip.lower() if ip_lower == "::1" or ip_lower.startswith("fd") or ip_lower.startswith("fe80"): return "local" return "external" def update_jail_config(jail_name, maxretry=None, bantime=None, findtime=None): """ Update Fail2Ban jail configuration (maxretry, bantime, findtime). Uses fail2ban-client set commands for live changes, and also writes to the jail.local file for persistence. bantime = -1 means permanent ban. Returns (success, message) """ if not _is_valid_jail_name(jail_name): return False, "Invalid jail name" changes = [] errors = [] # Apply live changes via fail2ban-client if maxretry is not None: try: val = int(maxretry) if val < 1: return False, "Max retries must be at least 1" rc, _, err = _run_cmd(["fail2ban-client", "set", jail_name, "maxretry", str(val)]) if rc == 0: changes.append(f"maxretry={val}") else: errors.append(f"maxretry: {err}") except ValueError: errors.append("maxretry must be a number") if bantime is not None: try: val = int(bantime) # -1 = permanent, otherwise must be positive if val < -1 or val == 0: return False, "Ban time must be positive seconds or -1 for permanent" rc, _, err = _run_cmd(["fail2ban-client", "set", jail_name, "bantime", str(val)]) if rc == 0: changes.append(f"bantime={val}") else: errors.append(f"bantime: {err}") except ValueError: errors.append("bantime must be a number") if findtime is not None: try: val = int(findtime) if val < 1: return False, "Find time must be positive" rc, _, err = _run_cmd(["fail2ban-client", "set", jail_name, "findtime", str(val)]) if rc == 0: changes.append(f"findtime={val}") else: errors.append(f"findtime: {err}") except ValueError: errors.append("findtime must be a number") # Also persist to jail.local so changes survive restart if changes: _persist_jail_config(jail_name, maxretry, bantime, findtime) if errors: return False, "Errors: " + "; ".join(errors) if changes: return True, f"Jail '{jail_name}' updated: {', '.join(changes)}" return False, "No changes specified" def _persist_jail_config(jail_name, maxretry=None, bantime=None, findtime=None): """ Write jail config changes to /etc/fail2ban/jail.local for persistence. `jail_name` is interpolated into an INI section header `[jail_name]`. Any callers should already have validated the name with `_is_valid_jail_name`, but we re-check defensively in case a future code path skips it. """ if not _is_valid_jail_name(jail_name): return # silently refuse malformed names; never write to disk jail_local = "/etc/fail2ban/jail.local" try: content = "" if os.path.isfile(jail_local): with open(jail_local, 'r') as f: content = f.read() lines = content.splitlines() if content else [] # Find or create the jail section jail_section = f"[{jail_name}]" section_start = -1 section_end = len(lines) for i, line in enumerate(lines): if line.strip() == jail_section: section_start = i elif section_start >= 0 and line.strip().startswith("[") and i > section_start: section_end = i break # Build settings to update settings = {} if maxretry is not None: settings["maxretry"] = str(int(maxretry)) if bantime is not None: settings["bantime"] = str(int(bantime)) if findtime is not None: settings["findtime"] = str(int(findtime)) if section_start >= 0: # Update existing section for key, val in settings.items(): found = False for i in range(section_start + 1, section_end): stripped = lines[i].strip() if stripped.startswith(f"{key}") and "=" in stripped: lines[i] = f"{key} = {val}" found = True break if not found: lines.insert(section_start + 1, f"{key} = {val}") section_end += 1 else: # Create new section if lines and lines[-1].strip(): lines.append("") lines.append(jail_section) for key, val in settings.items(): lines.append(f"{key} = {val}") with open(jail_local, 'w') as f: f.write("\n".join(lines) + "\n") except Exception: pass # Best effort persistence def apply_missing_jails(): """ Check for missing Fail2Ban jails (proxmox, proxmenux) and create them. Returns (success, message, applied_jails). """ applied = [] errors = [] # Check which jails are currently active rc, out, _ = _run_cmd(["fail2ban-client", "status"]) if rc != 0: return False, "Cannot communicate with fail2ban-client", [] current_jails = [] for line in out.splitlines(): if "Jail list:" in line: jails_str = line.split(":", 1)[1].strip() current_jails = [j.strip().lower() for j in jails_str.split(",") if j.strip()] # --- Proxmox jail (port 8006) --- # Fail2Ban's systemd backend can't reliably read pvedaemon worker entries # in real-time. We use a systemd service that tails the journal to a file # and fail2ban monitors that file with backend=auto. if "proxmox" not in current_jails: try: # Create the auth logger service if not present logger_service = "/etc/systemd/system/proxmox-auth-logger.service" if not os.path.isfile(logger_service): service_content = """[Unit] Description=Proxmox Auth Logger for Fail2Ban After=pvedaemon.service PartOf=fail2ban.service [Service] Type=simple ExecStart=/bin/bash -c 'journalctl -f _SYSTEMD_UNIT=pvedaemon.service -o short-iso --no-pager >> /var/log/proxmox-auth.log' Restart=always RestartSec=5 [Install] WantedBy=multi-user.target """ with open(logger_service, "w") as f: f.write(service_content) # Create log file log_file = "/var/log/proxmox-auth.log" if not os.path.isfile(log_file): with open(log_file, "w") as f: pass os.chmod(log_file, 0o640) _run_cmd(["systemctl", "daemon-reload"]) _run_cmd(["systemctl", "enable", "--now", "proxmox-auth-logger.service"]) # Create filter (only if user hasn't placed their own version) filter_path = "/etc/fail2ban/filter.d/proxmox.conf" if not os.path.isfile(filter_path): filter_content = """[Definition] failregex = authentication (failure|error); rhost=(::ffff:)? user=.* msg=.* ignoreregex = datepattern = ^%%Y-%%m-%%dT%%H:%%M:%%S """ with open(filter_path, "w") as f: f.write(filter_content) # Create jail (only if not already present on disk). The user # may have deliberately disabled it (`enabled = false`) while # keeping their other customisations; the previous code re- # enabled and clobbered everything every run. Audit Tier 6 — # `apply_missing_jails` sobrescribe configs personalizadas. jail_path = "/etc/fail2ban/jail.d/proxmox.conf" if not os.path.isfile(jail_path): jail_content = """[proxmox] enabled = true port = 8006 filter = proxmox backend = auto logpath = /var/log/proxmox-auth.log maxretry = 3 bantime = 3600 findtime = 600 """ with open(jail_path, "w") as f: f.write(jail_content) applied.append("proxmox") except Exception as e: errors.append(f"proxmox: {str(e)}") # --- ProxMenux Monitor jail (port 8008 + reverse proxy) --- # Uses backend=auto with logpath because the Flask app writes # auth failures directly to this file (not via syslog/journal). if "proxmenux" not in current_jails: try: # Create filter (preserve any user-customised version on disk) filter_path = "/etc/fail2ban/filter.d/proxmenux.conf" if not os.path.isfile(filter_path): filter_content = """[Definition] failregex = ^.*proxmenux-auth: authentication failure; rhost= user=.*$ ignoreregex = datepattern = ^%%Y-%%m-%%d %%H:%%M:%%S """ with open(filter_path, "w") as f: f.write(filter_content) # Create jail only if not already present (same rationale as # the proxmox jail above). jail_path = "/etc/fail2ban/jail.d/proxmenux.conf" if not os.path.isfile(jail_path): jail_content = """[proxmenux] enabled = true port = 8008,http,https filter = proxmenux backend = auto logpath = /var/log/proxmenux-auth.log maxretry = 3 bantime = 3600 findtime = 600 """ with open(jail_path, "w") as f: f.write(jail_content) # Ensure log file exists if not os.path.isfile("/var/log/proxmenux-auth.log"): with open("/var/log/proxmenux-auth.log", "w") as f: pass os.chmod("/var/log/proxmenux-auth.log", 0o640) applied.append("proxmenux") except Exception as e: errors.append(f"proxmenux: {str(e)}") if not applied and not errors: return True, "All jails are already configured", [] if applied: # Restart fail2ban to load new jails _run_cmd(["systemctl", "restart", "fail2ban"]) import time time.sleep(2) if errors: return False, "Errors: " + "; ".join(errors), applied return True, f"Applied jails: {', '.join(applied)}", applied def unban_ip(jail_name, ip_address): """ Unban a specific IP from a Fail2Ban jail. Returns (success, message) """ if not _is_valid_jail_name(jail_name): return False, "Invalid jail name" if not ip_address: return False, "IP address is required" # Validate IP format (basic check) if not re.match(r'^[\d.:a-fA-F]+$', ip_address): return False, f"Invalid IP address format: {ip_address}" rc, out, err = _run_cmd(["fail2ban-client", "set", jail_name, "unbanip", ip_address]) if rc == 0: return True, f"IP {ip_address} has been unbanned from jail '{jail_name}'" else: return False, f"Failed to unban IP: {err or out}" def get_fail2ban_recent_activity(lines=50): """ Get recent Fail2Ban log activity (bans and unbans). Returns list of recent events. """ events = [] log_file = "/var/log/fail2ban.log" if not os.path.isfile(log_file): return events # Coerce + clamp `lines`. The caller (Flask route) passed it through # without bounds checking, so a request with `?lines=999999999` made # `tail` read most of `/var/log/fail2ban.log` and stuffed it into a # response. Audit Tier 6 — `get_fail2ban_recent_activity` permite # `lines` arbitrario. try: lines_int = int(lines) except (TypeError, ValueError): lines_int = 50 lines_int = max(1, min(lines_int, 1000)) try: # Read last N lines using tail rc, out, _ = _run_cmd(["tail", f"-{lines_int}", log_file], timeout=5) if rc != 0 or not out: return events for line in out.splitlines(): event = None # Parse ban events ban_match = re.search( r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})[,\d]*\s+.*\[(\w+)\]\s+Ban\s+([\d.:a-fA-F]+)', line ) if ban_match: event = { "timestamp": ban_match.group(1), "jail": ban_match.group(2), "ip": ban_match.group(3), "action": "ban", } # Parse unban events if not event: unban_match = re.search( r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})[,\d]*\s+.*\[(\w+)\]\s+Unban\s+([\d.:a-fA-F]+)', line ) if unban_match: event = { "timestamp": unban_match.group(1), "jail": unban_match.group(2), "ip": unban_match.group(3), "action": "unban", } # Parse found (failed attempt) events if not event: found_match = re.search( r'(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})[,\d]*\s+.*\[(\w+)\]\s+Found\s+([\d.:a-fA-F]+)', line ) if found_match: event = { "timestamp": found_match.group(1), "jail": found_match.group(2), "ip": found_match.group(3), "action": "found", } if event: events.append(event) # Return most recent first events.reverse() except Exception: pass return events def detect_security_tools(): """ Detect installed security tools on the system. Returns dict with tool status info. """ tools = {} # Fail2Ban tools["fail2ban"] = _detect_fail2ban() # Lynis tools["lynis"] = _detect_lynis() return tools def _detect_fail2ban(): """Detect Fail2Ban installation and status""" info = { "installed": False, "active": False, "version": "", "jails": [], "banned_ips_count": 0, } rc, out, _ = _run_cmd(["fail2ban-client", "--version"]) if rc == 0: info["installed"] = True info["version"] = out.split("\n")[0].strip() if out else "" # Check service status rc2, out2, _ = _run_cmd(["systemctl", "is-active", "fail2ban"]) info["active"] = (rc2 == 0 and out2 == "active") if info["active"]: # Get jails rc3, out3, _ = _run_cmd(["fail2ban-client", "status"]) if rc3 == 0: for line in out3.splitlines(): if "Jail list:" in line: jails_str = line.split(":", 1)[1].strip() info["jails"] = [j.strip() for j in jails_str.split(",") if j.strip()] # Count banned IPs across all jails total_banned = 0 for jail in info["jails"]: rc4, out4, _ = _run_cmd(["fail2ban-client", "status", jail]) if rc4 == 0: for line in out4.splitlines(): if "Currently banned:" in line: try: count = int(line.split(":", 1)[1].strip()) total_banned += count except ValueError: pass info["banned_ips_count"] = total_banned return info def _find_lynis_cmd(): """Find the lynis binary path""" for path in ["/usr/local/bin/lynis", "/opt/lynis/lynis", "/usr/bin/lynis"]: if os.path.isfile(path) and os.access(path, os.X_OK): return path return None def _detect_lynis(): """Detect Lynis installation and status""" info = { "installed": False, "version": "", "last_scan": None, "hardening_index": None, } lynis_cmd = _find_lynis_cmd() if lynis_cmd: info["installed"] = True rc, out, _ = _run_cmd([lynis_cmd, "show", "version"]) if rc == 0: info["version"] = out.strip() # Check for last scan report - use full parser for accurate data report = parse_lynis_report() if report: info["last_scan"] = report.get("datetime_start", None) info["hardening_index"] = report.get("hardening_index", None) else: # Fallback: quick read of report.dat report_file = "/var/log/lynis-report.dat" if os.path.isfile(report_file): try: with open(report_file, 'r') as f: for line in f: if line.startswith("report_datetime_start="): info["last_scan"] = line.split("=", 1)[1].strip() elif line.startswith("hardening_index="): try: info["hardening_index"] = int(line.split("=", 1)[1].strip()) except ValueError: pass except Exception: pass return info # Track running audit _lynis_audit_running = False _lynis_audit_progress = "" def run_lynis_audit(): """ Run lynis audit system in the background. Returns (success, message). """ global _lynis_audit_running, _lynis_audit_progress # Guard the check-and-set under `_state_lock` — without it two Flask # threads racing into `run_lynis_audit` can both see the flag as # False, then both set it True, and both spawn a Lynis subprocess. # Audit Tier 6 — `_lynis_audit_running` global sin lock. with _state_lock: if _lynis_audit_running: return False, "An audit is already running" lynis_cmd = _find_lynis_cmd() if not lynis_cmd: return False, "Lynis is not installed" _lynis_audit_running = True _lynis_audit_progress = "starting" import threading def _run_audit(): global _lynis_audit_running, _lynis_audit_progress try: _lynis_audit_progress = "running" # Remove old report so lynis creates a fresh one report_file = "/var/log/lynis-report.dat" if os.path.isfile(report_file): os.remove(report_file) # Capture full formatted output. Lynis suppresses its nice # formatted output ([+] sections) when stdout is not a tty. # Use 'script' to simulate a terminal so Lynis outputs everything. output_log = "/var/log/lynis-output.log" rc = -1 out = "" err = "" # Method 1: Use 'script -qc' (simulates tty) try: script_cmd = [ "script", "-qec", f"{lynis_cmd} audit system --no-colors --quick", output_log ] result = subprocess.run( script_cmd, capture_output=True, text=True, timeout=600, env={**os.environ, "TERM": "dumb"} ) rc = result.returncode out = result.stdout.strip() err = result.stderr.strip() except Exception: pass # If script failed or output file is too small, try direct method output_ok = ( os.path.isfile(output_log) and os.path.getsize(output_log) > 500 ) if not output_ok: try: rc, out, err = _run_cmd( [lynis_cmd, "audit", "system", "--no-colors", "--quick"], timeout=600 ) if out: with open(output_log, "w") as fout: fout.write(out) except Exception: pass # Clean ALL ANSI/terminal escape codes and control chars from output if os.path.isfile(output_log): try: import re as _re with open(output_log, 'r', errors='replace') as fout: raw = fout.read() # Remove ALL ANSI escape sequences comprehensively: # CSI sequences: \x1b[ ... (letter) e.g. \x1b[0m, \x1b[?25h cleaned = _re.sub(r'\x1b\[[\x20-\x3f]*[\x40-\x7e]', '', raw) # OSC sequences: \x1b] ... \x07 or \x1b\\ cleaned = _re.sub(r'\x1b\].*?(?:\x07|\x1b\\)', '', cleaned) # Other escape sequences: \x1b followed by one char cleaned = _re.sub(r'\x1b[\x20-\x7e]', '', cleaned) # Remove remaining control characters except \n and \t cleaned = _re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', cleaned) # Remove carriage returns cleaned = cleaned.replace('\r', '') # Remove 'Script started/done' lines added by script cmd lines = cleaned.splitlines() lines = [l for l in lines if not l.strip().startswith('Script started') and not l.strip().startswith('Script done')] # Remove empty lines at start/end while lines and not lines[0].strip(): lines.pop(0) while lines and not lines[-1].strip(): lines.pop() cleaned = '\n'.join(lines) with open(output_log, 'w') as fout: fout.write(cleaned) except Exception: pass # Lynis returns the hardening index as exit code (e.g. 65) # Any non-negative code means the audit ran successfully if rc >= 0: _lynis_audit_progress = "completed" else: _lynis_audit_progress = f"error: {err[:200] if err else 'unknown error'}" except Exception as e: _lynis_audit_progress = f"error: {str(e)}" finally: _lynis_audit_running = False t = threading.Thread(target=_run_audit, daemon=True) t.start() return True, "Audit started" def get_lynis_audit_status(): """Get current audit status""" return { "running": _lynis_audit_running, "progress": _lynis_audit_progress, } def parse_lynis_report(): """ Parse /var/log/lynis-report.dat into structured report data. Also enriches with data from lynis.log when report.dat is sparse. Returns a dict with all audit findings. """ report_file = "/var/log/lynis-report.dat" output_file = "/var/log/lynis-output.log" # Need at least one data source if not os.path.isfile(report_file) and not os.path.isfile(output_file): return None report = { "datetime_start": "", "datetime_end": "", "lynis_version": "", "os_name": "", "os_version": "", "os_fullname": "", "hostname": "", "hardening_index": None, "tests_performed": 0, "warnings": [], "suggestions": [], "categories": {}, "installed_packages": 0, "kernel_version": "", "firewall_active": False, "malware_scanner": False, } # Collect all raw key-value pairs first for flexible matching raw_data = {} warnings_raw = [] suggestions_raw = [] if os.path.isfile(report_file): try: with open(report_file, 'r') as f: for line in f: line = line.strip() if not line or line.startswith("#") or line.startswith("["): continue if "=" not in line: continue key, _, value = line.partition("=") key = key.strip() value = value.strip() if key == "warning[]": warnings_raw.append(value) elif key == "suggestion[]": suggestions_raw.append(value) else: # Last value wins (some keys appear multiple times) raw_data[key] = value except Exception: pass # Continue with output.log data # Map known fields (Lynis uses varied naming across versions) report["datetime_start"] = raw_data.get("report_datetime_start", "") report["datetime_end"] = raw_data.get("report_datetime_end", "") report["lynis_version"] = raw_data.get("lynis_version", "") report["hostname"] = raw_data.get("hostname", "") # OS name - try multiple fields report["os_name"] = (raw_data.get("os_name", "") or raw_data.get("os", "") or raw_data.get("os_fullname", "")) report["os_version"] = (raw_data.get("os_version", "") or raw_data.get("os_version_id", "")) report["os_fullname"] = raw_data.get("os_fullname", "") # Kernel - try multiple field names report["kernel_version"] = (raw_data.get("os_kernel_version_full", "") or raw_data.get("os_kernel_version", "") or raw_data.get("linux_kernel_version", "") or raw_data.get("linux_version", "") or raw_data.get("os_kernelversion_full", "") or raw_data.get("os_kernelversion", "")) # Hardening index for k in ["hardening_index", "hpindex", "hp_index"]: if k in raw_data: try: report["hardening_index"] = int(raw_data[k]) break except ValueError: pass # Tests performed for k in ["tests_performed", "ctests_performed", "total_tests"]: if k in raw_data: try: val = int(raw_data[k]) if val > report["tests_performed"]: report["tests_performed"] = val except ValueError: pass # Installed packages for k in ["installed_packages", "installed_packages_array"]: if k in raw_data: try: report["installed_packages"] = int(raw_data[k]) except ValueError: # Might be a string like "package1,package2" - count them pkgs = raw_data[k] if pkgs: report["installed_packages"] = len(pkgs.split(",")) # Firewall for k in ["firewall_active", "firewall_installed"]: if k in raw_data and raw_data[k] in ("1", "true", "yes"): report["firewall_active"] = True break # Malware scanner for k in ["malware_scanner_installed", "malware_scanner"]: if k in raw_data and raw_data[k] in ("1", "true", "yes"): report["malware_scanner"] = True break # Parse warnings for w in warnings_raw: parts = w.split("|") if len(parts) >= 2: report["warnings"].append({ "test_id": parts[0].strip() if len(parts) > 0 else "", "severity": parts[1].strip() if len(parts) > 1 else "", "description": parts[2].strip() if len(parts) > 2 else parts[1].strip(), "solution": parts[3].strip() if len(parts) > 3 else "", }) # Parse suggestions for s in suggestions_raw: parts = s.split("|") if len(parts) >= 2: report["suggestions"].append({ "test_id": parts[0].strip() if len(parts) > 0 else "", "description": parts[1].strip() if len(parts) > 1 else "", "solution": parts[2].strip() if len(parts) > 2 else "", "details": parts[3].strip() if len(parts) > 3 else "", }) # Parse lynis-output.log (stdout) for section checks, fallback to lynis.log. # The same file gets parsed twice — once for sections/checks (this block), # once for warnings/suggestions/software (block below). Read once into # `_log_lines` and share the list across both passes so we don't pay the # disk + decode cost twice. Audit Tier 6 — `parse_lynis_report` lee # archivo entero a memoria 2 veces. report["sections"] = [] output_file = "/var/log/lynis-output.log" log_file = output_file if os.path.isfile(output_file) else "/var/log/lynis.log" _log_lines = [] if os.path.isfile(log_file): try: with open(log_file, 'r') as f: _log_lines = f.readlines() except Exception: _log_lines = [] if _log_lines: try: import re log_lines = _log_lines current_section = None current_checks = [] for line in log_lines: line = line.rstrip('\n') stripped = line.strip() # Detect section headers: "[+] Boot and services" # Use 'in' check first for speed, then regex for extraction if '[+]' in stripped: section_match = re.search(r'\[\+\]\s+(.+)', stripped) if section_match: # Save previous section if current_section and current_checks: report["sections"].append({ "name": current_section, "checks": current_checks, }) current_section = section_match.group(1).strip() # Remove trailing dashes from names like "Boot and services ------" current_section = re.sub(r'\s*-+\s*$', '', current_section) current_checks = [] continue # Skip separator lines, empty, banner lines if stripped.startswith('---') or not stripped: continue if stripped.startswith('===') or stripped.startswith('#'): current_section = None # Stop parsing after results summary continue # Detect any line with [ STATUS ] pattern (covers -, File:, Directory:, etc.) # After ANSI cleaning, cursor-positioning sequences are removed so # there may be only 1 space between the name and [ STATUS ]. # Strategy: find the LAST occurrence of [ ... ] on the line. bracket_match = re.search(r'\[\s*([A-Za-z0-9_ /.:!-]+?)\s*\]\s*$', stripped) if bracket_match and current_section: # Everything before the bracket is the check name bracket_start = bracket_match.start() check_name = stripped[:bracket_start].strip() check_status = bracket_match.group(1).strip() # Remove leading - or * from name check_name = re.sub(r'^[-*]+\s*', '', check_name).strip() # Skip noise lines (too short, dots-only, plugin progress) if (check_name and not check_name.startswith('..') and len(check_name) > 2 and check_name not in ('..', '...', '....')): current_checks.append({ "name": check_name, "status": check_status, }) continue # Detect sub-results: " Result: found 35 running services" # After strip(), line becomes "Result: ..." (no leading spaces) if stripped.startswith("Result:") and current_section and current_checks: detail = stripped[7:].strip() if detail: current_checks[-1]["detail"] = detail continue # Extract key data from the info block and summary # Format: "Key: value" or "Key : value" if ":" in stripped: if not report["hardening_index"] and "Hardening index" in stripped: m = re.search(r'Hardening index\s*:\s*(\d+)', stripped) if m: report["hardening_index"] = int(m.group(1)) elif report["tests_performed"] == 0 and "Tests performed" in stripped: m = re.search(r'Tests performed\s*:\s*(\d+)', stripped) if m: report["tests_performed"] = int(m.group(1)) elif not report["kernel_version"] and "Kernel version" in stripped: m = re.search(r'Kernel version\s*:\s*(.+)', stripped) if m: report["kernel_version"] = m.group(1).strip() elif not report["hostname"] and stripped.startswith("Hostname"): m = re.search(r'Hostname\s*:\s*(.+)', stripped) if m: val = m.group(1).strip() if val and val != "N/A": report["hostname"] = val elif not report["os_name"] and "Operating system name" in stripped: m = re.search(r'Operating system name\s*:\s*(.+)', stripped) if m: report["os_name"] = m.group(1).strip() elif not report["os_version"] and "Operating system version" in stripped: m = re.search(r'Operating system version\s*:\s*(.+)', stripped) if m: report["os_version"] = m.group(1).strip() elif not report["os_fullname"] and "Operating system:" in stripped: m = re.search(r'Operating system\s*:\s*(.+)', stripped) if m: report["os_fullname"] = m.group(1).strip() elif not report["lynis_version"] and "Program version" in stripped: m = re.search(r'Program version\s*:\s*(.+)', stripped) if m: report["lynis_version"] = m.group(1).strip() elif not report["datetime_start"] and "report_datetime_start" in stripped: pass # already from .dat # Save last section if current_section and current_checks: report["sections"].append({ "name": current_section, "checks": current_checks, }) # Filter out sections with no meaningful checks report["sections"] = [ s for s in report["sections"] if len(s["checks"]) > 0 ] # Store debug info about parsing report["_parse_debug"] = { "source": log_file, "total_lines": len(log_lines), "sections_found": len(report["sections"]), "section_names": [s["name"] for s in report["sections"]], } except Exception as e: report["sections"] = [] report["_parse_debug"] = {"error": str(e)} # Post-processing: extract firewall/malware status from parsed sections # This catches cases where report.dat doesn't have firewall_active but the # stdout shows "Checking host based firewall [ ACTIVE ]" if not report["firewall_active"] and report["sections"]: for section in report["sections"]: section_lower = section["name"].lower() if "firewall" in section_lower: for check in section["checks"]: name_lower = check["name"].lower() status_upper = check["status"].upper() if "host based firewall" in name_lower or "checking host" in name_lower: if status_upper in ("ACTIVE", "ENABLED", "FOUND", "OK"): report["firewall_active"] = True break if "iptables" in name_lower and status_upper in ("ACTIVE", "FOUND", "OK"): report["firewall_active"] = True break if report["firewall_active"]: break # Also check pve-firewall directly (Proxmox uses its own firewall service) if not report["firewall_active"]: try: rc, out, _ = _run_cmd(["systemctl", "is-active", "pve-firewall"]) if rc == 0 and out.strip() == "active": report["firewall_active"] = True except Exception: pass # Extract malware scanner status from sections if not report["malware_scanner"] and report["sections"]: for section in report["sections"]: for check in section["checks"]: name_lower = check["name"].lower() status_upper = check["status"].upper() if any(x in name_lower for x in ["malware", "clamav", "rkhunter", "chkrootkit"]): if status_upper in ("FOUND", "INSTALLED", "OK"): report["malware_scanner"] = True break if report["malware_scanner"]: break # Always parse lynis-output.log for warnings, suggestions, software # components. The report.dat is often sparse/empty on many systems. # Reuse `_log_lines` already loaded above instead of re-opening the file. if _log_lines: try: import re stdout_lines = _log_lines in_warnings = False in_suggestions = False in_software = False stdout_warnings = [] stdout_suggestions = [] last_suggestion = None for sline in stdout_lines: sline = sline.rstrip('\n') sstripped = sline.strip() # Detect "Warnings (N):" header if re.match(r'^Warnings\s*\(\d+\)\s*:', sstripped): in_warnings = True in_suggestions = False in_software = False last_suggestion = None continue # Detect "Suggestions (N):" header if re.match(r'^Suggestions\s*\(\d+\)\s*:', sstripped): in_suggestions = True in_warnings = False in_software = False last_suggestion = None continue # Detect "Software components:" section if "Software components:" in sstripped: in_software = True in_warnings = False in_suggestions = False last_suggestion = None continue # End of sections on major separators if sstripped.startswith('==='): in_warnings = False in_suggestions = False in_software = False last_suggestion = None continue # Parse "Software components" for firewall/malware # Format: "- Firewall [V]" or "[X]" if in_software: sw_match = re.match(r'^-\s+(.+?)\s+\[([VX])\]', sstripped) if sw_match: sw_name = sw_match.group(1).strip().lower() sw_status = sw_match.group(2) if "firewall" in sw_name and sw_status == "V": report["firewall_active"] = True if "malware" in sw_name and sw_status == "V": report["malware_scanner"] = True # Parse warning lines: "! Warning text [TEST-ID]" if in_warnings and sstripped.startswith('!'): wm = re.match(r'^!\s+(.+?)\s+\[([A-Z0-9_-]+)\]', sstripped) if wm: stdout_warnings.append({ "test_id": wm.group(2), "severity": "Warning", "description": wm.group(1).strip(), "solution": "", }) # Parse suggestion lines: "* Suggestion text [TEST-ID]" if in_suggestions: if sstripped.startswith('*'): sm = re.match(r'^\*\s+(.+?)\s+\[([A-Z0-9_-]+)\]', sstripped) if sm: last_suggestion = { "test_id": sm.group(2), "description": sm.group(1).strip(), "solution": "", "details": "", } stdout_suggestions.append(last_suggestion) elif last_suggestion and sstripped.startswith('- Details'): dm = re.match(r'^-\s*Details\s*:\s*(.+)', sstripped) if dm: last_suggestion["details"] = dm.group(1).strip() elif last_suggestion and sstripped.startswith('- Solution'): sm2 = re.match(r'^-\s*Solution\s*:\s*(.+)', sstripped) if sm2: last_suggestion["solution"] = sm2.group(1).strip() # Use stdout data if report.dat had none if len(report["warnings"]) == 0 and stdout_warnings: report["warnings"] = stdout_warnings if len(report["suggestions"]) == 0 and stdout_suggestions: report["suggestions"] = stdout_suggestions except Exception: pass # Fallback: datetime from file modification time if not report["datetime_start"]: for fpath in ["/var/log/lynis-output.log", "/var/log/lynis-report.dat"]: if os.path.isfile(fpath): try: import time mtime = os.path.getmtime(fpath) report["datetime_start"] = time.strftime( "%Y-%m-%d %H:%M", time.localtime(mtime) ) break except Exception: pass # Fallback: get kernel from uname if still empty if not report["kernel_version"]: try: rc, out, _ = _run_cmd(["uname", "-r"]) if rc == 0 and out.strip(): report["kernel_version"] = out.strip() except Exception: pass # Fallback: get hostname from system if not report["hostname"]: try: import socket report["hostname"] = socket.gethostname() except Exception: pass # Fallback: get installed packages count if report["installed_packages"] == 0: try: rc, out, _ = _run_cmd(["dpkg", "-l"]) if rc == 0 and out: # Count lines that start with "ii " (installed packages) count = sum(1 for l in out.splitlines() if l.startswith("ii ")) if count > 0: report["installed_packages"] = count except Exception: pass # ── Proxmox Context: classify warnings/suggestions ──────────────── # Proxmox VE is a hypervisor with specific requirements that cause # Lynis to flag items that are normal/expected in this environment. # We classify each finding and calculate an adjusted score. PVE_WARNING_CONTEXT = { "FIRE-4512": { "reason": "Proxmox uses pve-firewall which manages iptables/nftables rules dynamically. Direct iptables rules are not used.", "expected": True, }, "NETW-3015": { "reason": "Network bridges (vmbr*) operate in promiscuous mode by design to forward traffic between VMs/containers.", "expected": True, }, "MAIL-8818": { "reason": "Postfix is used by Proxmox for system notifications. The SMTP banner can be customized but is low risk on an internal server.", "expected": False, "severity_override": "low", }, "NETW-2705": { "reason": "Single DNS server is common in home/lab environments. Add a secondary DNS in /etc/resolv.conf if possible.", "expected": False, "severity_override": "low", }, "PKGS-7392": { "reason": "Package updates should be applied regularly. Check if the 'vulnerable' packages are Proxmox-specific packages pending a PVE update.", "expected": False, }, } PVE_SUGGESTION_CONTEXT = { "BOOT-5122": { "reason": "GRUB password is recommended for physical servers but less critical for headless/remote Proxmox nodes.", "expected": False, "severity_override": "low", }, "BOOT-5264": { "reason": "Many Proxmox core services (pve-*, corosync, spiceproxy, etc.) run without systemd hardening. This is by design as they need broad system access.", "expected": True, }, "KRNL-5788": { "reason": "Proxmox uses its own kernel (pve-kernel) which may not place vmlinuz in the standard location.", "expected": True, }, "AUTH-9282": { "reason": "Proxmox system accounts (www-data, backup, etc.) don't use password expiry. Only applies to interactive user accounts.", "expected": True, }, "AUTH-9284": { "reason": "Locked system accounts are normal in Proxmox (daemon, nobody, etc.).", "expected": True, }, "USB-1000": { "reason": "USB passthrough to VMs may require USB drivers. Disable only if USB passthrough is not needed.", "expected": False, "severity_override": "low", }, "STRG-1846": { "reason": "FireWire is typically not used in modern servers. Safe to disable.", "expected": False, "severity_override": "low", }, "NETW-3200": { "reason": "Protocols dccp, sctp, rds, tipc are typically not needed. Can be disabled via modprobe blacklist.", "expected": False, }, "SSH-7408": { "reason": "SSH hardening is recommended but PermitRootLogin is required for Proxmox API/CLI management. Other SSH settings can be tuned.", "expected": False, }, "FILE-6310": { "reason": "Separate partitions for /home and /var are best practice but Proxmox typically uses a simple partition layout with LVM-thin for VM storage.", "expected": True, }, "KRNL-6000": { "reason": "Some sysctl values differ because Proxmox needs IP forwarding, bridge-nf-call, and relaxed kernel settings for VM/container networking.", "expected": True, }, "HRDN-7230": { "reason": "A malware scanner (rkhunter, ClamAV) is recommended but optional for a dedicated hypervisor.", "expected": False, "severity_override": "low", }, "HRDN-7222": { "reason": "Restricting compiler access is good practice on production servers. Less critical on a hypervisor where only root has access.", "expected": False, "severity_override": "low", }, "FINT-4350": { "reason": "File integrity monitoring (AIDE, Tripwire) is recommended for production but optional for home/lab environments.", "expected": False, "severity_override": "low", }, "ACCT-9622": { "reason": "Process accounting is useful for forensics but not required for a hypervisor.", "expected": False, "severity_override": "low", }, "ACCT-9626": { "reason": "Sysstat is useful for performance monitoring but not a security requirement.", "expected": False, "severity_override": "low", }, "ACCT-9628": { "reason": "Auditd provides detailed audit logging. Recommended for production, optional for home/lab.", "expected": False, "severity_override": "low", }, "TOOL-5002": { "reason": "Automation tools (Ansible, Puppet) are useful for multi-node clusters but not required for single-node setups.", "expected": True, }, "LOGG-2154": { "reason": "External logging is recommended for production but optional for single-node environments.", "expected": False, "severity_override": "low", }, "BANN-7126": { "reason": "Legal banners in /etc/issue are recommended for compliance but not a security risk.", "expected": False, "severity_override": "low", }, "BANN-7130": { "reason": "Legal banners in /etc/issue.net are recommended for compliance but not a security risk.", "expected": False, "severity_override": "low", }, } # Apply Proxmox context to warnings pve_expected_warnings = 0 for w in report["warnings"]: tid = w.get("test_id", "") ctx = PVE_WARNING_CONTEXT.get(tid) if ctx: w["proxmox_context"] = ctx["reason"] w["proxmox_expected"] = ctx.get("expected", False) if ctx.get("severity_override"): w["proxmox_severity"] = ctx["severity_override"] if ctx.get("expected", False): pve_expected_warnings += 1 else: w["proxmox_context"] = "" w["proxmox_expected"] = False # Apply Proxmox context to suggestions pve_expected_suggestions = 0 for s in report["suggestions"]: tid = s.get("test_id", "") ctx = PVE_SUGGESTION_CONTEXT.get(tid) if ctx: s["proxmox_context"] = ctx["reason"] s["proxmox_expected"] = ctx.get("expected", False) if ctx.get("severity_override"): s["proxmox_severity"] = ctx["severity_override"] if ctx.get("expected", False): pve_expected_suggestions += 1 else: s["proxmox_context"] = "" s["proxmox_expected"] = False # Calculate Proxmox-adjusted score # Lynis score is based on total tests and findings. # We boost the score proportionally to the expected items. raw_score = report["hardening_index"] or 0 total_findings = len(report["warnings"]) + len(report["suggestions"]) expected_findings = pve_expected_warnings + pve_expected_suggestions if total_findings > 0 and raw_score > 0: # Each finding roughly reduces the score. Expected findings should # not penalize. We estimate the boost proportionally. penalty_per_finding = (100 - raw_score) / max(total_findings, 1) boost = int(penalty_per_finding * expected_findings * 0.8) adjusted_score = min(100, raw_score + boost) else: adjusted_score = raw_score report["proxmox_adjusted_score"] = adjusted_score report["proxmox_expected_warnings"] = pve_expected_warnings report["proxmox_expected_suggestions"] = pve_expected_suggestions report["proxmox_context_applied"] = True return report # ------------------------------------------------------------------- # Uninstall Functions # ------------------------------------------------------------------- def uninstall_fail2ban(): """ Uninstall Fail2Ban and clean up all configuration. Returns (success, message). """ try: # Stop fail2ban service _run_cmd(["systemctl", "stop", "fail2ban"], timeout=30) _run_cmd(["systemctl", "disable", "fail2ban"], timeout=10) # Stop and remove auth logger services _run_cmd(["systemctl", "stop", "proxmox-auth-logger.service"], timeout=10) _run_cmd(["systemctl", "disable", "proxmox-auth-logger.service"], timeout=10) _run_cmd(["systemctl", "stop", "ssh-auth-logger.service"], timeout=10) _run_cmd(["systemctl", "disable", "ssh-auth-logger.service"], timeout=10) # Remove systemd service files for svc_file in [ "/etc/systemd/system/proxmox-auth-logger.service", "/etc/systemd/system/ssh-auth-logger.service", ]: if os.path.exists(svc_file): os.remove(svc_file) _run_cmd(["systemctl", "daemon-reload"], timeout=10) # Remove log files created by auth loggers for log_file in ["/var/log/proxmox-auth.log", "/var/log/ssh-auth.log"]: if os.path.exists(log_file): os.remove(log_file) # Purge fail2ban package _run_cmd(["apt-get", "purge", "-y", "fail2ban"], timeout=120) # Remove configuration files for cfg_file in [ "/etc/fail2ban/jail.d/proxmox.conf", "/etc/fail2ban/jail.d/proxmenux.conf", "/etc/fail2ban/filter.d/proxmox.conf", "/etc/fail2ban/filter.d/proxmenux.conf", "/etc/fail2ban/jail.local", ]: if os.path.exists(cfg_file): os.remove(cfg_file) # Restore SSH MaxAuthTries if backup exists base_dir = "/usr/local/share/proxmenux" backup_file = os.path.join(base_dir, "sshd_maxauthtries_backup") sshd_config = "/etc/ssh/sshd_config" if os.path.exists(backup_file) and os.path.exists(sshd_config): try: with open(backup_file, 'r') as f: original_val = f.read().strip() if original_val: with open(sshd_config, 'r') as f: content = f.read() import re content = re.sub( r'^MaxAuthTries.*$', f'MaxAuthTries {original_val}', content, flags=re.MULTILINE ) with open(sshd_config, 'w') as f: f.write(content) _run_cmd(["systemctl", "reload", "sshd"], timeout=10) os.remove(backup_file) except Exception: pass # Remove journald drop-in journald_dropin = "/etc/systemd/journald.conf.d/proxmenux-loglevel.conf" if os.path.exists(journald_dropin): os.remove(journald_dropin) _run_cmd(["systemctl", "restart", "systemd-journald"], timeout=30) # Update component status components_file = os.path.join(base_dir, "components_status.json") if os.path.exists(components_file): try: import json with open(components_file, 'r') as f: components = json.load(f) if "fail2ban" in components: components["fail2ban"]["status"] = "removed" components["fail2ban"]["version"] = "" with open(components_file, 'w') as f: json.dump(components, f, indent=2) except Exception: pass return True, "Fail2Ban has been uninstalled successfully" except Exception as e: return False, f"Error uninstalling Fail2Ban: {str(e)}" def uninstall_lynis(): """ Uninstall Lynis and clean up all files. Returns (success, message). """ try: import shutil # Remove installation directory if os.path.exists("/opt/lynis"): shutil.rmtree("/opt/lynis") # Remove wrapper script if os.path.exists("/usr/local/bin/lynis"): os.remove("/usr/local/bin/lynis") # Remove report files for report_file in [ "/var/log/lynis-report.dat", "/var/log/lynis.log", "/var/log/lynis-output.log", ]: if os.path.exists(report_file): os.remove(report_file) # Update component status base_dir = "/usr/local/share/proxmenux" components_file = os.path.join(base_dir, "components_status.json") if os.path.exists(components_file): try: import json with open(components_file, 'r') as f: components = json.load(f) if "lynis" in components: components["lynis"]["status"] = "removed" components["lynis"]["version"] = "" with open(components_file, 'w') as f: json.dump(components, f, indent=2) except Exception: pass return True, "Lynis has been uninstalled successfully" except Exception as e: return False, f"Error uninstalling Lynis: {str(e)}"