mirror of
https://github.com/MacRimi/ProxMenux.git
synced 2026-05-14 04:55:01 +00:00
1126 lines
38 KiB
Python
1126 lines
38 KiB
Python
"""
|
|
Authentication Manager Module
|
|
Handles all authentication-related operations including:
|
|
- Loading/saving auth configuration
|
|
- Password hashing and verification
|
|
- JWT token generation and validation
|
|
- Auth status checking
|
|
- Two-Factor Authentication (2FA/TOTP)
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import hashlib
|
|
import hmac
|
|
import secrets
|
|
import base64
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import jwt
|
|
JWT_AVAILABLE = True
|
|
except ImportError:
|
|
JWT_AVAILABLE = False
|
|
print("Warning: PyJWT not available. Authentication features will be limited.")
|
|
|
|
try:
|
|
import pyotp
|
|
import segno
|
|
import io
|
|
import base64
|
|
TOTP_AVAILABLE = True
|
|
except ImportError:
|
|
TOTP_AVAILABLE = False
|
|
print("Warning: pyotp/segno not available. 2FA features will be disabled.")
|
|
|
|
# Configuration
|
|
CONFIG_DIR = Path.home() / ".config" / "proxmenux-monitor"
|
|
AUTH_CONFIG_FILE = CONFIG_DIR / "auth.json"
|
|
# Sentinel for legacy installs that started under the hardcoded JWT_SECRET.
|
|
# The audit (Tier 4 #22) flagged that constant — anyone with access to the
|
|
# public repo could forge JWTs against any deployment. We now generate a
|
|
# random per-install secret on first use and persist it in auth.json. Tokens
|
|
# issued under the legacy secret stop verifying once the migration runs;
|
|
# users have to log in once. That's intentional and accepted by the audit.
|
|
_LEGACY_JWT_SECRET = "proxmenux-monitor-secret-key-change-in-production"
|
|
JWT_ALGORITHM = "HS256"
|
|
TOKEN_EXPIRATION_HOURS = 24
|
|
# Audit Tier 5: bind tokens to issuer/audience so they can't be cross-used
|
|
# against another deployment / service that happens to share the same
|
|
# JWT_SECRET. Verified in `verify_token` with a permissive fallback for
|
|
# tokens issued before the rollout.
|
|
JWT_ISSUER = "proxmenux-monitor"
|
|
JWT_AUDIENCE = "api"
|
|
|
|
# Password-hashing format: pbkdf2_sha256 with 600k iterations (OWASP 2023+
|
|
# baseline). Uses only stdlib (`hashlib.pbkdf2_hmac`), no external deps.
|
|
# Format on disk: "pbkdf2_sha256$<iterations>$<salt_b64>$<hash_b64>".
|
|
# Legacy SHA-256 (single-line 64 hex chars) is still recognized for one final
|
|
# verify and re-hashed on the next successful login (lazy migration).
|
|
_PWD_PBKDF2_ITERS = 600000
|
|
_PWD_PBKDF2_PREFIX = "pbkdf2_sha256$"
|
|
|
|
|
|
def ensure_config_dir():
|
|
"""Ensure the configuration directory exists"""
|
|
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def load_auth_config():
|
|
"""
|
|
Load authentication configuration from file
|
|
Returns dict with structure:
|
|
{
|
|
"enabled": bool,
|
|
"username": str,
|
|
"password_hash": str,
|
|
"declined": bool,
|
|
"configured": bool,
|
|
"totp_enabled": bool, # 2FA enabled flag
|
|
"totp_secret": str, # TOTP secret key
|
|
"backup_codes": list, # List of backup codes
|
|
"api_tokens": list, # List of stored API token metadata
|
|
"revoked_tokens": list # List of revoked token hashes
|
|
}
|
|
"""
|
|
if not AUTH_CONFIG_FILE.exists():
|
|
return {
|
|
"enabled": False,
|
|
"username": None,
|
|
"password_hash": None,
|
|
"declined": False,
|
|
"configured": False,
|
|
"totp_enabled": False,
|
|
"totp_secret": None,
|
|
"backup_codes": [],
|
|
"api_tokens": [],
|
|
"revoked_tokens": []
|
|
}
|
|
|
|
try:
|
|
with open(AUTH_CONFIG_FILE, 'r') as f:
|
|
config = json.load(f)
|
|
# Ensure all required fields exist
|
|
config.setdefault("declined", False)
|
|
config.setdefault("configured", config.get("enabled", False) or config.get("declined", False))
|
|
config.setdefault("totp_enabled", False)
|
|
config.setdefault("totp_secret", None)
|
|
config.setdefault("backup_codes", [])
|
|
config.setdefault("api_tokens", [])
|
|
config.setdefault("revoked_tokens", [])
|
|
return config
|
|
except Exception as e:
|
|
print(f"Error loading auth config: {e}")
|
|
return {
|
|
"enabled": False,
|
|
"username": None,
|
|
"password_hash": None,
|
|
"declined": False,
|
|
"configured": False,
|
|
"totp_enabled": False,
|
|
"totp_secret": None,
|
|
"backup_codes": [],
|
|
"api_tokens": [],
|
|
"revoked_tokens": []
|
|
}
|
|
|
|
|
|
def save_auth_config(config):
|
|
"""Save authentication configuration to file"""
|
|
ensure_config_dir()
|
|
try:
|
|
with open(AUTH_CONFIG_FILE, 'w') as f:
|
|
json.dump(config, f, indent=2)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error saving auth config: {e}")
|
|
return False
|
|
|
|
|
|
def _get_jwt_secret():
|
|
"""Return the per-install JWT signing secret, generating one on first use.
|
|
|
|
The secret lives in `auth.json` under the `jwt_secret` key. On a fresh
|
|
install or when migrating from the legacy hardcoded constant, we mint
|
|
a new `secrets.token_urlsafe(32)`-derived value and persist it. Once
|
|
persisted it never changes (rotation would log out every active session).
|
|
Audit Tier 4 #22.
|
|
"""
|
|
config = load_auth_config()
|
|
sec = config.get("jwt_secret")
|
|
if isinstance(sec, str) and len(sec) >= 32:
|
|
return sec
|
|
new_secret = secrets.token_urlsafe(48)
|
|
config["jwt_secret"] = new_secret
|
|
save_auth_config(config)
|
|
return new_secret
|
|
|
|
|
|
# Server-side mirror of the frontend's `validatePasswordStrength`. Defense
|
|
# in depth: the UI enforces these rules but a direct API caller (curl,
|
|
# scripted setup, custom client) bypasses the JS — so the same minimum has
|
|
# to be enforced here. Audit Tier 6 — Política de password débil.
|
|
_OBVIOUS_PASSWORDS = {
|
|
"password", "password1", "password123",
|
|
"12345678", "123456789", "1234567890",
|
|
"qwerty", "qwertyuiop", "letmein", "welcome",
|
|
"admin", "administrator", "root", "proxmox", "proxmenux",
|
|
"changeme", "abcdefgh",
|
|
}
|
|
|
|
|
|
def _validate_password_strength(pw):
|
|
"""Return None if `pw` passes policy, otherwise a human-readable reason."""
|
|
if not isinstance(pw, str) or len(pw) < 10:
|
|
return "Password must be at least 10 characters"
|
|
categories = sum([
|
|
any(c.islower() for c in pw),
|
|
any(c.isupper() for c in pw),
|
|
any(c.isdigit() for c in pw),
|
|
any(not c.isalnum() for c in pw),
|
|
])
|
|
if categories < 3:
|
|
return "Password must mix at least 3 of: lowercase, uppercase, digits, symbols"
|
|
if pw.lower() in _OBVIOUS_PASSWORDS:
|
|
return "That password is in the common-passwords list — pick something else"
|
|
return None
|
|
|
|
|
|
def hash_password(password):
|
|
"""Hash a password with PBKDF2-HMAC-SHA256.
|
|
|
|
Format: `pbkdf2_sha256$<iters>$<salt_b64>$<hash_b64>`. Per-password 16-byte
|
|
random salt; 600k iterations (OWASP 2023+ baseline). Stdlib only — no
|
|
bcrypt / argon2-cffi dependency added to the AppImage build. See audit
|
|
Tier 4 #23.
|
|
"""
|
|
salt = secrets.token_bytes(16)
|
|
derived = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, _PWD_PBKDF2_ITERS, dklen=32)
|
|
return (
|
|
f"{_PWD_PBKDF2_PREFIX}{_PWD_PBKDF2_ITERS}$"
|
|
f"{base64.b64encode(salt).decode('ascii')}$"
|
|
f"{base64.b64encode(derived).decode('ascii')}"
|
|
)
|
|
|
|
|
|
def _verify_pbkdf2(password, stored):
|
|
"""Verify a PBKDF2 hash. Returns True on match, False on any failure."""
|
|
try:
|
|
# `pbkdf2_sha256$<iters>$<salt_b64>$<hash_b64>`
|
|
body = stored[len(_PWD_PBKDF2_PREFIX):]
|
|
iters_str, salt_b64, hash_b64 = body.split('$', 2)
|
|
iters = int(iters_str)
|
|
salt = base64.b64decode(salt_b64)
|
|
expected = base64.b64decode(hash_b64)
|
|
except Exception:
|
|
return False
|
|
derived = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, iters, dklen=len(expected))
|
|
return hmac.compare_digest(derived, expected)
|
|
|
|
|
|
def _is_legacy_sha256(stored):
|
|
"""True if `stored` looks like the old unsalted SHA-256 hex digest."""
|
|
if not isinstance(stored, str):
|
|
return False
|
|
if len(stored) != 64:
|
|
return False
|
|
return all(c in '0123456789abcdef' for c in stored.lower())
|
|
|
|
|
|
def verify_password(password, password_hash):
|
|
"""Verify a password against its hash.
|
|
|
|
Recognizes both the new PBKDF2 format and the legacy unsalted SHA-256.
|
|
The legacy path is kept around for one final verify so existing accounts
|
|
can log in once and trigger a rehash via `_maybe_rehash_password` —
|
|
see lazy migration in `authenticate()`.
|
|
"""
|
|
if not isinstance(password_hash, str) or not password_hash:
|
|
return False
|
|
if password_hash.startswith(_PWD_PBKDF2_PREFIX):
|
|
return _verify_pbkdf2(password, password_hash)
|
|
if _is_legacy_sha256(password_hash):
|
|
legacy = hashlib.sha256(password.encode('utf-8')).hexdigest()
|
|
return hmac.compare_digest(legacy, password_hash)
|
|
return False
|
|
|
|
|
|
def _maybe_rehash_password(password, current_hash):
|
|
"""If the stored hash is legacy SHA-256, return a fresh PBKDF2 hash to persist.
|
|
|
|
Returns None when no rehash is needed (already PBKDF2 or unrecognized).
|
|
Caller is responsible for saving the new hash back to auth.json.
|
|
"""
|
|
if _is_legacy_sha256(current_hash):
|
|
return hash_password(password)
|
|
return None
|
|
|
|
|
|
def generate_token(username):
|
|
"""Generate a JWT token for the given username"""
|
|
if not JWT_AVAILABLE:
|
|
return None
|
|
|
|
payload = {
|
|
'username': username,
|
|
'exp': datetime.utcnow() + timedelta(hours=TOKEN_EXPIRATION_HOURS),
|
|
'iat': datetime.utcnow(),
|
|
'iss': JWT_ISSUER,
|
|
'aud': JWT_AUDIENCE,
|
|
}
|
|
|
|
try:
|
|
token = jwt.encode(payload, _get_jwt_secret(), algorithm=JWT_ALGORITHM)
|
|
return token
|
|
except Exception as e:
|
|
print(f"Error generating token: {e}")
|
|
return None
|
|
|
|
|
|
# In-memory cache for revoked_tokens to avoid hitting disk on every request.
|
|
# Invalidated by both TTL and the auth.json mtime so a revocation from another
|
|
# process/restart still propagates within seconds.
|
|
_REVOKED_CACHE = {'set': None, 'mtime': 0.0, 'fetched_at': 0.0}
|
|
_REVOKED_TTL = 30.0
|
|
|
|
|
|
def _get_revoked_tokens_cached():
|
|
"""Return a frozenset of revoked-token hashes, cached for ~30s."""
|
|
import time
|
|
now = time.monotonic()
|
|
try:
|
|
mtime = AUTH_CONFIG_FILE.stat().st_mtime
|
|
except OSError:
|
|
mtime = 0.0
|
|
if (
|
|
_REVOKED_CACHE['set'] is not None
|
|
and now - _REVOKED_CACHE['fetched_at'] < _REVOKED_TTL
|
|
and mtime == _REVOKED_CACHE['mtime']
|
|
):
|
|
return _REVOKED_CACHE['set']
|
|
config = load_auth_config()
|
|
revoked = frozenset(config.get("revoked_tokens", []))
|
|
_REVOKED_CACHE['set'] = revoked
|
|
_REVOKED_CACHE['mtime'] = mtime
|
|
_REVOKED_CACHE['fetched_at'] = now
|
|
return revoked
|
|
|
|
|
|
def _invalidate_revoked_cache():
|
|
"""Force a re-read on the next verify_token call."""
|
|
_REVOKED_CACHE['set'] = None
|
|
|
|
|
|
def verify_token_full(token):
|
|
"""Like `verify_token` but also returns the `scope` claim.
|
|
|
|
Returns `(username, scope)` on success, `(None, None)` otherwise.
|
|
Tokens issued before scope was added (no claim) get `'full_admin'`
|
|
so legacy sessions keep working unchanged. Audit Tier 6 — Tokens
|
|
API JWT 365 días sin scope.
|
|
"""
|
|
if not JWT_AVAILABLE or not token:
|
|
return None, None
|
|
try:
|
|
token_hash = hashlib.sha256(token.encode()).hexdigest()
|
|
if token_hash in _get_revoked_tokens_cached():
|
|
return None, None
|
|
try:
|
|
payload = jwt.decode(
|
|
token, _get_jwt_secret(),
|
|
algorithms=[JWT_ALGORITHM],
|
|
audience=JWT_AUDIENCE, issuer=JWT_ISSUER,
|
|
)
|
|
except (jwt.MissingRequiredClaimError, jwt.InvalidAudienceError, jwt.InvalidIssuerError):
|
|
payload = jwt.decode(token, _get_jwt_secret(), algorithms=[JWT_ALGORITHM])
|
|
return payload.get('username'), payload.get('scope', 'full_admin')
|
|
except jwt.ExpiredSignatureError:
|
|
return None, None
|
|
except jwt.InvalidTokenError:
|
|
return None, None
|
|
|
|
|
|
def verify_token(token):
|
|
"""
|
|
Verify a JWT token
|
|
Returns username if valid, None otherwise
|
|
Also checks if the token has been revoked
|
|
"""
|
|
if not JWT_AVAILABLE or not token:
|
|
return None
|
|
|
|
try:
|
|
# Revoked-token list is cached in memory (TTL + mtime) so high-RPS
|
|
# endpoints don't reread auth.json from disk on every @require_auth call.
|
|
token_hash = hashlib.sha256(token.encode()).hexdigest()
|
|
if token_hash in _get_revoked_tokens_cached():
|
|
return None
|
|
|
|
# Verify against the per-install secret first. Tokens issued under the
|
|
# legacy hardcoded secret were forgeable by anyone with read access to
|
|
# the public repo — those are intentionally rejected so users get a
|
|
# one-time relogin to mint a fresh token.
|
|
# `iss`/`aud` claims are validated when present; tokens issued before
|
|
# the iss/aud rollout (no claims) fall back to a permissive decode so
|
|
# active sessions don't break on upgrade.
|
|
try:
|
|
payload = jwt.decode(
|
|
token,
|
|
_get_jwt_secret(),
|
|
algorithms=[JWT_ALGORITHM],
|
|
audience=JWT_AUDIENCE,
|
|
issuer=JWT_ISSUER,
|
|
)
|
|
except (jwt.MissingRequiredClaimError, jwt.InvalidAudienceError, jwt.InvalidIssuerError):
|
|
payload = jwt.decode(token, _get_jwt_secret(), algorithms=[JWT_ALGORITHM])
|
|
return payload.get('username')
|
|
except jwt.ExpiredSignatureError:
|
|
print("Token has expired")
|
|
return None
|
|
except jwt.InvalidTokenError as e:
|
|
print(f"Invalid token: {e}")
|
|
return None
|
|
|
|
|
|
def store_api_token_metadata(token, token_name="API Token"):
|
|
"""
|
|
Store API token metadata (hash, name, creation date) for listing and revocation.
|
|
The actual token is never stored - only a hash for identification.
|
|
"""
|
|
config = load_auth_config()
|
|
token_hash = hashlib.sha256(token.encode()).hexdigest()
|
|
token_id = token_hash[:16]
|
|
|
|
token_entry = {
|
|
"id": token_id,
|
|
"name": token_name,
|
|
"token_hash": token_hash,
|
|
"token_prefix": token[:12] + "...",
|
|
"created_at": datetime.utcnow().isoformat() + "Z",
|
|
"expires_at": (datetime.utcnow() + timedelta(days=365)).isoformat() + "Z"
|
|
}
|
|
|
|
config.setdefault("api_tokens", [])
|
|
config["api_tokens"].append(token_entry)
|
|
save_auth_config(config)
|
|
return token_entry
|
|
|
|
|
|
def list_api_tokens():
|
|
"""
|
|
List all stored API token metadata (no actual tokens are returned).
|
|
Returns list of token entries with id, name, prefix, creation and expiration dates.
|
|
"""
|
|
config = load_auth_config()
|
|
tokens = config.get("api_tokens", [])
|
|
revoked = set(config.get("revoked_tokens", []))
|
|
|
|
result = []
|
|
for t in tokens:
|
|
entry = {
|
|
"id": t.get("id"),
|
|
"name": t.get("name", "API Token"),
|
|
"token_prefix": t.get("token_prefix", "***"),
|
|
"created_at": t.get("created_at"),
|
|
"expires_at": t.get("expires_at"),
|
|
"revoked": t.get("token_hash") in revoked
|
|
}
|
|
result.append(entry)
|
|
return result
|
|
|
|
|
|
def revoke_api_token(token_id):
|
|
"""
|
|
Revoke an API token by its ID.
|
|
Adds the token hash to the revoked list so it fails verification.
|
|
Returns (success: bool, message: str)
|
|
"""
|
|
config = load_auth_config()
|
|
tokens = config.get("api_tokens", [])
|
|
|
|
target = None
|
|
for t in tokens:
|
|
if t.get("id") == token_id:
|
|
target = t
|
|
break
|
|
|
|
if not target:
|
|
return False, "Token not found"
|
|
|
|
token_hash = target.get("token_hash")
|
|
config.setdefault("revoked_tokens", [])
|
|
|
|
if token_hash in config["revoked_tokens"]:
|
|
return False, "Token is already revoked"
|
|
|
|
config["revoked_tokens"].append(token_hash)
|
|
|
|
# Remove from the active tokens list
|
|
config["api_tokens"] = [t for t in tokens if t.get("id") != token_id]
|
|
|
|
if save_auth_config(config):
|
|
_invalidate_revoked_cache()
|
|
return True, "Token revoked successfully"
|
|
else:
|
|
return False, "Failed to save configuration"
|
|
|
|
|
|
def get_auth_status():
|
|
"""
|
|
Get current authentication status
|
|
Returns dict with:
|
|
{
|
|
"auth_enabled": bool,
|
|
"auth_configured": bool,
|
|
"declined": bool,
|
|
"username": str or None,
|
|
"authenticated": bool,
|
|
"totp_enabled": bool # 2FA status
|
|
}
|
|
"""
|
|
config = load_auth_config()
|
|
return {
|
|
"auth_enabled": config.get("enabled", False),
|
|
"auth_configured": config.get("configured", False),
|
|
"declined": config.get("declined", False),
|
|
"username": config.get("username") if config.get("enabled") else None,
|
|
"authenticated": False,
|
|
"totp_enabled": config.get("totp_enabled", False) # Include 2FA status
|
|
}
|
|
|
|
|
|
def setup_auth(username, password):
|
|
"""
|
|
Set up authentication with username and password
|
|
Returns (success: bool, message: str)
|
|
"""
|
|
# Refuse if auth has already been configured. Without this guard an
|
|
# unauthenticated POST to /api/auth/setup would let an attacker overwrite
|
|
# the existing admin credentials and take over the account. See audit
|
|
# Tier 1 #4.
|
|
existing = load_auth_config()
|
|
if existing.get("configured", False):
|
|
return False, "Authentication is already configured"
|
|
|
|
if not username or not password:
|
|
return False, "Username and password are required"
|
|
|
|
pw_err = _validate_password_strength(password)
|
|
if pw_err:
|
|
return False, pw_err
|
|
|
|
config = {
|
|
"enabled": True,
|
|
"username": username,
|
|
"password_hash": hash_password(password),
|
|
"declined": False,
|
|
"configured": True,
|
|
"totp_enabled": False,
|
|
"totp_secret": None,
|
|
"backup_codes": []
|
|
}
|
|
|
|
if save_auth_config(config):
|
|
return True, "Authentication configured successfully"
|
|
else:
|
|
return False, "Failed to save authentication configuration"
|
|
|
|
|
|
def decline_auth():
|
|
"""
|
|
Mark authentication as declined by user
|
|
Returns (success: bool, message: str)
|
|
"""
|
|
config = load_auth_config()
|
|
config["enabled"] = False
|
|
config["declined"] = True
|
|
config["configured"] = True
|
|
config["username"] = None
|
|
config["password_hash"] = None
|
|
config["totp_enabled"] = False
|
|
config["totp_secret"] = None
|
|
config["backup_codes"] = []
|
|
|
|
if save_auth_config(config):
|
|
return True, "Authentication declined"
|
|
else:
|
|
return False, "Failed to save configuration"
|
|
|
|
|
|
def disable_auth():
|
|
"""
|
|
Disable authentication (different from decline - can be re-enabled)
|
|
Returns (success: bool, message: str)
|
|
"""
|
|
config = load_auth_config()
|
|
config["enabled"] = False
|
|
config["username"] = None
|
|
config["password_hash"] = None
|
|
config["declined"] = False
|
|
config["configured"] = False
|
|
config["totp_enabled"] = False
|
|
config["totp_secret"] = None
|
|
config["backup_codes"] = []
|
|
# Intentionally preserve `api_tokens` and `revoked_tokens` across
|
|
# disable→re-enable cycles. Wiping them allowed a previously revoked
|
|
# token to verify again because nothing on the deny-list would reject
|
|
# it. Audit Tier 5 — disable_auth() borra revoked_tokens.
|
|
_invalidate_revoked_cache()
|
|
|
|
if save_auth_config(config):
|
|
return True, "Authentication disabled"
|
|
else:
|
|
return False, "Failed to save configuration"
|
|
|
|
|
|
def enable_auth():
|
|
"""
|
|
Enable authentication (must already be configured)
|
|
Returns (success: bool, message: str)
|
|
"""
|
|
config = load_auth_config()
|
|
|
|
if not config.get("username") or not config.get("password_hash"):
|
|
return False, "Authentication not configured. Please set up username and password first."
|
|
|
|
config["enabled"] = True
|
|
config["declined"] = False
|
|
|
|
if save_auth_config(config):
|
|
return True, "Authentication enabled"
|
|
else:
|
|
return False, "Failed to save configuration"
|
|
|
|
|
|
def change_password(old_password, new_password, totp_code=None):
|
|
"""
|
|
Change the authentication password.
|
|
|
|
When 2FA is enabled on the account, a valid TOTP code (or backup code) is
|
|
REQUIRED in addition to the current password — otherwise an attacker who
|
|
obtained the password (e.g. via shoulder-surfing or phishing) could rotate
|
|
it without the second factor and lock the legitimate user out. See audit
|
|
Tier 1 #10.
|
|
|
|
Returns (success: bool, message: str).
|
|
"""
|
|
config = load_auth_config()
|
|
|
|
if not config.get("enabled"):
|
|
return False, "Authentication is not enabled"
|
|
|
|
if not verify_password(old_password, config.get("password_hash", "")):
|
|
return False, "Current password is incorrect"
|
|
|
|
pw_err = _validate_password_strength(new_password)
|
|
if pw_err:
|
|
return False, f"New {pw_err[0].lower()}{pw_err[1:]}"
|
|
|
|
# 2FA gate: if the account has TOTP enabled, the caller must prove they
|
|
# also hold the second factor.
|
|
if config.get("totp_enabled"):
|
|
username = config.get("username")
|
|
if not totp_code:
|
|
return False, "2FA code required to change password"
|
|
# Try TOTP first, then fall back to backup code (same UX as login).
|
|
ok, _ = verify_totp(username, totp_code, use_backup=False)
|
|
if not ok:
|
|
ok, _ = verify_totp(username, totp_code, use_backup=True)
|
|
if not ok:
|
|
return False, "Invalid 2FA code"
|
|
# Reload after possible backup-code consumption inside verify_totp.
|
|
config = load_auth_config()
|
|
|
|
config["password_hash"] = hash_password(new_password)
|
|
|
|
if save_auth_config(config):
|
|
return True, "Password changed successfully"
|
|
else:
|
|
return False, "Failed to save new password"
|
|
|
|
|
|
def generate_totp_secret():
|
|
"""Generate a new TOTP secret key"""
|
|
if not TOTP_AVAILABLE:
|
|
return None
|
|
return pyotp.random_base32()
|
|
|
|
|
|
def generate_totp_qr(username, secret):
|
|
"""
|
|
Generate a QR code for TOTP setup
|
|
Returns base64 encoded SVG image
|
|
"""
|
|
if not TOTP_AVAILABLE:
|
|
return None
|
|
|
|
try:
|
|
# Create TOTP URI
|
|
totp = pyotp.TOTP(secret)
|
|
uri = totp.provisioning_uri(
|
|
name=username,
|
|
issuer_name="ProxMenux Monitor"
|
|
)
|
|
|
|
qr = segno.make(uri)
|
|
|
|
# Convert to SVG string
|
|
buffer = io.BytesIO()
|
|
qr.save(buffer, kind='svg', scale=4, border=2)
|
|
svg_bytes = buffer.getvalue()
|
|
svg_content = svg_bytes.decode('utf-8')
|
|
|
|
# Return as data URL
|
|
svg_base64 = base64.b64encode(svg_content.encode()).decode('utf-8')
|
|
return f"data:image/svg+xml;base64,{svg_base64}"
|
|
except Exception as e:
|
|
print(f"Error generating QR code: {e}")
|
|
return None
|
|
|
|
|
|
def generate_backup_codes(count=8):
|
|
"""Generate backup codes for 2FA recovery"""
|
|
codes = []
|
|
for _ in range(count):
|
|
# Generate 8-character alphanumeric code
|
|
code = ''.join(secrets.choice('ABCDEFGHJKLMNPQRSTUVWXYZ23456789') for _ in range(8))
|
|
# Format as XXXX-XXXX for readability
|
|
formatted = f"{code[:4]}-{code[4:]}"
|
|
codes.append({
|
|
"code": hashlib.sha256(formatted.encode()).hexdigest(),
|
|
"used": False
|
|
})
|
|
return codes
|
|
|
|
|
|
def setup_totp(username):
|
|
"""
|
|
Set up TOTP for a user
|
|
Returns (success: bool, secret: str, qr_code: str, backup_codes: list, message: str)
|
|
"""
|
|
if not TOTP_AVAILABLE:
|
|
return False, None, None, None, "2FA is not available (pyotp/segno not installed)"
|
|
|
|
config = load_auth_config()
|
|
|
|
if not config.get("enabled"):
|
|
return False, None, None, None, "Authentication must be enabled first"
|
|
|
|
if config.get("username") != username:
|
|
return False, None, None, None, "Invalid username"
|
|
|
|
# Generate new secret and backup codes
|
|
secret = generate_totp_secret()
|
|
qr_code = generate_totp_qr(username, secret)
|
|
backup_codes_plain = []
|
|
backup_codes_hashed = generate_backup_codes()
|
|
|
|
# Generate plain text backup codes for display (only returned once)
|
|
for i in range(8):
|
|
code = ''.join(secrets.choice('ABCDEFGHJKLMNPQRSTUVWXYZ23456789') for _ in range(8))
|
|
formatted = f"{code[:4]}-{code[4:]}"
|
|
backup_codes_plain.append(formatted)
|
|
backup_codes_hashed[i]["code"] = hashlib.sha256(formatted.encode()).hexdigest()
|
|
|
|
# Store secret and hashed backup codes (not enabled yet until verified)
|
|
config["totp_secret"] = secret
|
|
config["backup_codes"] = backup_codes_hashed
|
|
|
|
if save_auth_config(config):
|
|
return True, secret, qr_code, backup_codes_plain, "2FA setup initiated"
|
|
else:
|
|
return False, None, None, None, "Failed to save 2FA configuration"
|
|
|
|
|
|
def verify_totp(username, token, use_backup=False):
|
|
"""
|
|
Verify a TOTP token or backup code
|
|
Returns (success: bool, message: str)
|
|
"""
|
|
if not TOTP_AVAILABLE and not use_backup:
|
|
return False, "2FA is not available"
|
|
|
|
config = load_auth_config()
|
|
|
|
if not config.get("totp_enabled"):
|
|
return False, "2FA is not enabled"
|
|
|
|
if config.get("username") != username:
|
|
return False, "Invalid username"
|
|
|
|
# Check backup code
|
|
if use_backup:
|
|
token_hash = hashlib.sha256(token.encode()).hexdigest()
|
|
for backup_code in config.get("backup_codes", []):
|
|
if backup_code["code"] == token_hash and not backup_code["used"]:
|
|
backup_code["used"] = True
|
|
save_auth_config(config)
|
|
return True, "Backup code accepted"
|
|
return False, "Invalid or already used backup code"
|
|
|
|
# Check TOTP token. `valid_window=1` accepts the previous, current and
|
|
# next 30s timesteps, which is friendly to clock skew but lets a leaked
|
|
# OTP be replayed for up to ~90s. Track the last successfully-used
|
|
# timestep counter per account and reject anything <= that.
|
|
import time as _time
|
|
totp = pyotp.TOTP(config.get("totp_secret"))
|
|
if not totp.verify(token, valid_window=1):
|
|
return False, "Invalid 2FA code"
|
|
|
|
# Find which counter the OTP corresponds to (one of current ± 1).
|
|
interval = getattr(totp, 'interval', 30)
|
|
current_counter = int(_time.time() // interval)
|
|
matched_counter = None
|
|
for c in (current_counter - 1, current_counter, current_counter + 1):
|
|
try:
|
|
if totp.at(c) == token:
|
|
matched_counter = c
|
|
break
|
|
except Exception:
|
|
continue
|
|
if matched_counter is None:
|
|
# `verify()` succeeded but we couldn't map to a counter — fail closed.
|
|
return False, "Invalid 2FA code"
|
|
|
|
last_counter = config.get("last_totp_counter", -1)
|
|
if matched_counter <= last_counter:
|
|
return False, "2FA code already used; wait for the next one"
|
|
|
|
config["last_totp_counter"] = matched_counter
|
|
save_auth_config(config)
|
|
return True, "2FA verification successful"
|
|
|
|
|
|
def enable_totp(username, verification_token):
|
|
"""
|
|
Enable TOTP after successful verification
|
|
Returns (success: bool, message: str)
|
|
"""
|
|
if not TOTP_AVAILABLE:
|
|
return False, "2FA is not available"
|
|
|
|
config = load_auth_config()
|
|
|
|
if not config.get("totp_secret"):
|
|
return False, "2FA has not been set up. Please set up 2FA first."
|
|
|
|
if config.get("username") != username:
|
|
return False, "Invalid username"
|
|
|
|
# Verify the token before enabling
|
|
totp = pyotp.TOTP(config.get("totp_secret"))
|
|
if not totp.verify(verification_token, valid_window=1):
|
|
return False, "Invalid verification code. Please try again."
|
|
|
|
config["totp_enabled"] = True
|
|
|
|
if save_auth_config(config):
|
|
return True, "2FA enabled successfully"
|
|
else:
|
|
return False, "Failed to enable 2FA"
|
|
|
|
|
|
def disable_totp(username, password, totp_code=None):
|
|
"""
|
|
Disable TOTP (requires password confirmation AND a valid 2FA code).
|
|
|
|
Previously this endpoint only required the password, which meant an
|
|
attacker who phished or replayed the password could turn off the user's
|
|
second factor entirely. Per audit Tier 1 #10 and the related frontend
|
|
finding ("Disable 2FA solo password"), we now also demand a valid TOTP
|
|
code (or backup code) to disable the protection it represents.
|
|
|
|
Returns (success: bool, message: str).
|
|
"""
|
|
config = load_auth_config()
|
|
|
|
if config.get("username") != username:
|
|
return False, "Invalid username"
|
|
|
|
if not verify_password(password, config.get("password_hash", "")):
|
|
return False, "Invalid password"
|
|
|
|
# If TOTP is currently active, require the second factor to disable it.
|
|
if config.get("totp_enabled"):
|
|
if not totp_code:
|
|
return False, "2FA code required to disable 2FA"
|
|
ok, _ = verify_totp(username, totp_code, use_backup=False)
|
|
if not ok:
|
|
ok, _ = verify_totp(username, totp_code, use_backup=True)
|
|
if not ok:
|
|
return False, "Invalid 2FA code"
|
|
# Reload in case a backup code was consumed.
|
|
config = load_auth_config()
|
|
|
|
config["totp_enabled"] = False
|
|
config["totp_secret"] = None
|
|
config["backup_codes"] = []
|
|
|
|
if save_auth_config(config):
|
|
return True, "2FA disabled successfully"
|
|
else:
|
|
return False, "Failed to disable 2FA"
|
|
|
|
|
|
# -------------------------------------------------------------------
|
|
# SSL/HTTPS Certificate Management
|
|
# -------------------------------------------------------------------
|
|
|
|
SSL_CONFIG_FILE = Path(os.environ.get("PROXMENUX_SSL_CONFIG", "/etc/proxmenux/ssl_config.json"))
|
|
|
|
# Default Proxmox certificate paths
|
|
PROXMOX_CERT_PATH = "/etc/pve/local/pve-ssl.pem"
|
|
PROXMOX_KEY_PATH = "/etc/pve/local/pve-ssl.key"
|
|
# When the admin uploads a custom certificate via the PVE UI, it's written
|
|
# to `pveproxy-ssl.pem` instead and PVE itself prefers it. We do the same so
|
|
# `detect_proxmox_certificates` reflects the cert the user actually wants
|
|
# served. Issue #181.
|
|
PROXMOX_CUSTOM_CERT_PATH = "/etc/pve/local/pveproxy-ssl.pem"
|
|
PROXMOX_CUSTOM_KEY_PATH = "/etc/pve/local/pveproxy-ssl.key"
|
|
|
|
|
|
def load_ssl_config():
|
|
"""Load SSL configuration from file"""
|
|
if not SSL_CONFIG_FILE.exists():
|
|
return {
|
|
"enabled": False,
|
|
"cert_path": "",
|
|
"key_path": "",
|
|
"source": "none" # "none", "proxmox", "custom"
|
|
}
|
|
|
|
try:
|
|
with open(SSL_CONFIG_FILE, 'r') as f:
|
|
config = json.load(f)
|
|
config.setdefault("enabled", False)
|
|
config.setdefault("cert_path", "")
|
|
config.setdefault("key_path", "")
|
|
config.setdefault("source", "none")
|
|
return config
|
|
except Exception:
|
|
return {
|
|
"enabled": False,
|
|
"cert_path": "",
|
|
"key_path": "",
|
|
"source": "none"
|
|
}
|
|
|
|
|
|
def save_ssl_config(config):
|
|
"""Save SSL configuration to file"""
|
|
try:
|
|
SSL_CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(SSL_CONFIG_FILE, 'w') as f:
|
|
json.dump(config, f, indent=2)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error saving SSL config: {e}")
|
|
return False
|
|
|
|
|
|
def detect_proxmox_certificates():
|
|
"""
|
|
Detect available Proxmox certificates.
|
|
Returns dict with detection results.
|
|
|
|
Prefers the custom-uploaded `pveproxy-ssl.pem` (what PVE itself uses
|
|
when the admin uploaded a Let's Encrypt / commercial cert via the UI)
|
|
and falls back to the default self-signed `pve-ssl.pem`. Issue #181 —
|
|
detector solo encontraba pve-ssl.pem.
|
|
"""
|
|
result = {
|
|
"proxmox_available": False,
|
|
"proxmox_cert": PROXMOX_CERT_PATH,
|
|
"proxmox_key": PROXMOX_KEY_PATH,
|
|
"cert_info": None
|
|
}
|
|
|
|
if os.path.isfile(PROXMOX_CUSTOM_CERT_PATH) and os.path.isfile(PROXMOX_CUSTOM_KEY_PATH):
|
|
result["proxmox_cert"] = PROXMOX_CUSTOM_CERT_PATH
|
|
result["proxmox_key"] = PROXMOX_CUSTOM_KEY_PATH
|
|
result["proxmox_available"] = True
|
|
elif os.path.isfile(PROXMOX_CERT_PATH) and os.path.isfile(PROXMOX_KEY_PATH):
|
|
result["proxmox_available"] = True
|
|
|
|
if result["proxmox_available"]:
|
|
# Try to get certificate info from whichever cert we picked.
|
|
try:
|
|
import subprocess
|
|
cert_output = subprocess.run(
|
|
["openssl", "x509", "-in", result["proxmox_cert"], "-noout", "-subject", "-enddate", "-issuer"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if cert_output.returncode == 0:
|
|
lines = cert_output.stdout.strip().split('\n')
|
|
info = {}
|
|
for line in lines:
|
|
if line.startswith("subject="):
|
|
info["subject"] = line.replace("subject=", "").strip()
|
|
elif line.startswith("notAfter="):
|
|
info["expires"] = line.replace("notAfter=", "").strip()
|
|
elif line.startswith("issuer="):
|
|
issuer = line.replace("issuer=", "").strip()
|
|
info["issuer"] = issuer
|
|
info["is_self_signed"] = info.get("subject", "") == issuer
|
|
result["cert_info"] = info
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
def validate_certificate_files(cert_path, key_path):
|
|
"""
|
|
Validate that cert and key files exist and are readable.
|
|
Returns (valid: bool, message: str)
|
|
"""
|
|
if not cert_path or not key_path:
|
|
return False, "Certificate and key paths are required"
|
|
|
|
if not os.path.isfile(cert_path):
|
|
return False, f"Certificate file not found: {cert_path}"
|
|
|
|
if not os.path.isfile(key_path):
|
|
return False, f"Key file not found: {key_path}"
|
|
|
|
# Verify files are readable
|
|
try:
|
|
with open(cert_path, 'r') as f:
|
|
content = f.read(100)
|
|
if "BEGIN CERTIFICATE" not in content and "BEGIN TRUSTED CERTIFICATE" not in content:
|
|
return False, "Certificate file does not appear to be a valid PEM certificate"
|
|
|
|
with open(key_path, 'r') as f:
|
|
content = f.read(100)
|
|
if "BEGIN" not in content or "KEY" not in content:
|
|
return False, "Key file does not appear to be a valid PEM key"
|
|
except PermissionError:
|
|
return False, "Cannot read certificate files. Check file permissions."
|
|
except Exception as e:
|
|
return False, f"Error reading certificate files: {str(e)}"
|
|
|
|
# Verify cert and key match
|
|
try:
|
|
import subprocess
|
|
cert_mod = subprocess.run(
|
|
["openssl", "x509", "-noout", "-modulus", "-in", cert_path],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
key_mod = subprocess.run(
|
|
["openssl", "rsa", "-noout", "-modulus", "-in", key_path],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if cert_mod.returncode == 0 and key_mod.returncode == 0:
|
|
if cert_mod.stdout.strip() != key_mod.stdout.strip():
|
|
return False, "Certificate and key do not match"
|
|
except Exception:
|
|
pass # Non-critical, proceed anyway
|
|
|
|
return True, "Certificate files are valid"
|
|
|
|
|
|
def configure_ssl(cert_path, key_path, source="custom"):
|
|
"""
|
|
Configure SSL with given certificate and key paths.
|
|
Returns (success: bool, message: str)
|
|
"""
|
|
valid, message = validate_certificate_files(cert_path, key_path)
|
|
if not valid:
|
|
return False, message
|
|
|
|
config = {
|
|
"enabled": True,
|
|
"cert_path": cert_path,
|
|
"key_path": key_path,
|
|
"source": source
|
|
}
|
|
|
|
if save_ssl_config(config):
|
|
return True, "SSL configured successfully. Restart the monitor service to apply changes."
|
|
else:
|
|
return False, "Failed to save SSL configuration"
|
|
|
|
|
|
def disable_ssl():
|
|
"""Disable SSL and return to HTTP"""
|
|
config = {
|
|
"enabled": False,
|
|
"cert_path": "",
|
|
"key_path": "",
|
|
"source": "none"
|
|
}
|
|
|
|
if save_ssl_config(config):
|
|
return True, "SSL disabled. Restart the monitor service to apply changes."
|
|
else:
|
|
return False, "Failed to save SSL configuration"
|
|
|
|
|
|
def get_ssl_context():
|
|
"""
|
|
Get SSL context for Flask if SSL is configured and enabled.
|
|
Returns tuple (cert_path, key_path) or None
|
|
"""
|
|
config = load_ssl_config()
|
|
|
|
if not config.get("enabled"):
|
|
return None
|
|
|
|
cert_path = config.get("cert_path", "")
|
|
key_path = config.get("key_path", "")
|
|
|
|
if cert_path and key_path and os.path.isfile(cert_path) and os.path.isfile(key_path):
|
|
return (cert_path, key_path)
|
|
|
|
return None
|
|
|
|
|
|
def authenticate(username, password, totp_token=None):
|
|
"""
|
|
Authenticate a user with username, password, and optional TOTP
|
|
Returns (success: bool, token: str or None, requires_totp: bool, message: str)
|
|
"""
|
|
config = load_auth_config()
|
|
|
|
if not config.get("enabled"):
|
|
return False, None, False, "Authentication is not enabled"
|
|
|
|
if username != config.get("username"):
|
|
return False, None, False, "Invalid username or password"
|
|
|
|
if not verify_password(password, config.get("password_hash", "")):
|
|
return False, None, False, "Invalid username or password"
|
|
|
|
# Lazy migration: if the stored hash is the legacy unsalted SHA-256, replace
|
|
# it with a fresh PBKDF2 hash now that we have the cleartext in hand. The
|
|
# next login uses the new hash; the legacy code path stays around only as
|
|
# the recognition entry in `verify_password`. Audit Tier 4 #23.
|
|
upgraded = _maybe_rehash_password(password, config.get("password_hash", ""))
|
|
if upgraded:
|
|
config["password_hash"] = upgraded
|
|
try:
|
|
save_auth_config(config)
|
|
except Exception as e:
|
|
# Don't block login if persistence fails — the user is still
|
|
# authenticated and we can rehash on a future login attempt.
|
|
print(f"[auth] Failed to persist rehashed password: {e}")
|
|
|
|
if config.get("totp_enabled"):
|
|
if not totp_token:
|
|
# First step: password OK, now request TOTP code (not a failure)
|
|
return False, None, True, "2FA code required"
|
|
|
|
# Verify TOTP token or backup code
|
|
success, message = verify_totp(username, totp_token, use_backup=len(totp_token) == 9) # Backup codes are formatted XXXX-XXXX
|
|
if not success:
|
|
# TOTP code is wrong: return requires_totp=False so the caller
|
|
# logs it as a real authentication failure for Fail2Ban
|
|
return False, None, False, "Invalid 2FA code"
|
|
|
|
token = generate_token(username)
|
|
if token:
|
|
return True, token, False, "Authentication successful"
|
|
else:
|
|
return False, None, False, "Failed to generate authentication token"
|