mirror of
https://github.com/MacRimi/ProxMenux.git
synced 2026-05-13 20:45:01 +00:00
673 lines
27 KiB
Python
673 lines
27 KiB
Python
"""
|
|
Flask Authentication Routes
|
|
Provides REST API endpoints for authentication management
|
|
"""
|
|
|
|
import logging
|
|
import logging.handlers
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from collections import defaultdict, deque
|
|
from flask import Blueprint, jsonify, request
|
|
import auth_manager
|
|
from jwt_middleware import require_auth
|
|
import jwt
|
|
import datetime
|
|
|
|
|
|
# ─── Login rate limiter (audit Tier 3 #21) ───────────────────────────────
|
|
#
|
|
# Limits failed-login storms even on installations without Fail2Ban. Sliding
|
|
# window: 5 attempts per IP per 5 minutes. After the limit, the endpoint
|
|
# returns 429 until the oldest attempt ages out of the window. Counts ALL
|
|
# /api/auth/login POSTs (we don't know success vs failure until after auth)
|
|
# — a legitimate user has ample headroom for typos.
|
|
class _LoginRateLimiter:
|
|
def __init__(self, max_attempts=5, window_seconds=300):
|
|
self._max = max_attempts
|
|
self._window = window_seconds
|
|
self._buckets = defaultdict(deque) # ip -> deque[ts]
|
|
self._lock = threading.Lock()
|
|
|
|
def check_and_record(self, ip):
|
|
"""Returns (allowed: bool, retry_after_seconds: int)."""
|
|
if not ip:
|
|
ip = "unknown"
|
|
now = time.time()
|
|
cutoff = now - self._window
|
|
with self._lock:
|
|
bucket = self._buckets[ip]
|
|
# Drop stale entries
|
|
while bucket and bucket[0] < cutoff:
|
|
bucket.popleft()
|
|
if len(bucket) >= self._max:
|
|
# Reject; advise client when to try again.
|
|
retry = max(1, int(self._window - (now - bucket[0])))
|
|
return False, retry
|
|
bucket.append(now)
|
|
# Bound memory in pathological scans by reaping idle IPs occasionally.
|
|
if len(self._buckets) > 1024:
|
|
stale = [k for k, q in self._buckets.items() if not q or q[-1] < cutoff]
|
|
for k in stale:
|
|
self._buckets.pop(k, None)
|
|
return True, 0
|
|
|
|
|
|
_login_limiter = _LoginRateLimiter(max_attempts=5, window_seconds=300)
|
|
|
|
# Dedicated logger for auth failures (Fail2Ban reads this file)
|
|
auth_logger = logging.getLogger("proxmenux-auth")
|
|
auth_logger.setLevel(logging.WARNING)
|
|
|
|
# Handler 1: File for Fail2Ban
|
|
_auth_file_handler = logging.FileHandler("/var/log/proxmenux-auth.log")
|
|
_auth_file_handler.setFormatter(logging.Formatter("%(asctime)s proxmenux-auth: %(message)s"))
|
|
auth_logger.addHandler(_auth_file_handler)
|
|
|
|
# Handler 2: Syslog for JournalWatcher notifications
|
|
# This sends to the systemd journal so notification_events.py can detect auth failures
|
|
try:
|
|
_auth_syslog_handler = logging.handlers.SysLogHandler(address='/dev/log', facility=logging.handlers.SysLogHandler.LOG_AUTH)
|
|
_auth_syslog_handler.setFormatter(logging.Formatter("proxmenux-auth: %(message)s"))
|
|
_auth_syslog_handler.ident = "proxmenux-auth"
|
|
auth_logger.addHandler(_auth_syslog_handler)
|
|
except Exception:
|
|
pass # Syslog may not be available in all environments
|
|
|
|
|
|
# Only honor XFF when the operator has explicitly opted in via env var.
|
|
# Without this, a remote client can send `X-Forwarded-For: 1.2.3.4` to make
|
|
# each failed login look like it came from a different IP, defeating the
|
|
# Fail2Ban brute-force jail and polluting the auth log used by F2B. See
|
|
# audit Tier 3 #20.
|
|
_TRUST_PROXY = os.environ.get("PROXMENUX_TRUST_PROXY", "0") == "1"
|
|
|
|
|
|
def _get_client_ip():
|
|
"""Get the real client IP. Honors XFF/X-Real-IP only when PROXMENUX_TRUST_PROXY=1."""
|
|
if _TRUST_PROXY:
|
|
forwarded = request.headers.get("X-Forwarded-For", "")
|
|
if forwarded:
|
|
# First IP in the chain is the real client
|
|
return forwarded.split(",")[0].strip()
|
|
real_ip = request.headers.get("X-Real-IP", "")
|
|
if real_ip:
|
|
return real_ip.strip()
|
|
return request.remote_addr or "unknown"
|
|
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
@auth_bp.route('/api/auth/status', methods=['GET'])
|
|
def auth_status():
|
|
"""Get current authentication status"""
|
|
try:
|
|
status = auth_manager.get_auth_status()
|
|
|
|
token = request.headers.get('Authorization', '').replace('Bearer ', '')
|
|
if token:
|
|
username = auth_manager.verify_token(token)
|
|
if username:
|
|
status['authenticated'] = True
|
|
|
|
return jsonify(status)
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
# -------------------------------------------------------------------
|
|
# SSL/HTTPS Certificate Management
|
|
# -------------------------------------------------------------------
|
|
|
|
@auth_bp.route('/api/ssl/status', methods=['GET'])
|
|
def ssl_status():
|
|
"""Get current SSL configuration status and detect available certificates"""
|
|
try:
|
|
config = auth_manager.load_ssl_config()
|
|
detection = auth_manager.detect_proxmox_certificates()
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"ssl_enabled": config.get("enabled", False),
|
|
"source": config.get("source", "none"),
|
|
"cert_path": config.get("cert_path", ""),
|
|
"key_path": config.get("key_path", ""),
|
|
"proxmox_available": detection.get("proxmox_available", False),
|
|
"proxmox_cert": detection.get("proxmox_cert", ""),
|
|
"proxmox_key": detection.get("proxmox_key", ""),
|
|
"cert_info": detection.get("cert_info")
|
|
})
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
def _schedule_service_restart(delay=1.5):
|
|
"""Schedule a restart of the monitor service via systemctl after a short delay.
|
|
This gives time for the HTTP response to reach the client before the process restarts."""
|
|
def _do_restart():
|
|
time.sleep(delay)
|
|
print("[ProxMenux] Restarting monitor service to apply SSL changes...")
|
|
# Use systemctl restart which properly stops and starts the service.
|
|
# This works because systemd manages proxmenux-monitor.service.
|
|
try:
|
|
subprocess.Popen(
|
|
["systemctl", "restart", "proxmenux-monitor"],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL
|
|
)
|
|
except Exception as e:
|
|
print(f"[ProxMenux] Failed to restart via systemctl: {e}")
|
|
# Fallback: try to restart the process directly
|
|
os.kill(os.getpid(), 15) # SIGTERM
|
|
|
|
t = threading.Thread(target=_do_restart, daemon=True)
|
|
t.start()
|
|
|
|
|
|
@auth_bp.route('/api/ssl/configure', methods=['POST'])
|
|
@require_auth
|
|
def ssl_configure():
|
|
"""Configure SSL with Proxmox or custom certificates"""
|
|
try:
|
|
data = request.json or {}
|
|
source = data.get("source", "proxmox")
|
|
auto_restart = data.get("auto_restart", True)
|
|
|
|
if source == "proxmox":
|
|
# Sprint 11.8 / Issue #181: prefer the ACME-uploaded cert
|
|
# (pveproxy-ssl.pem) over the self-signed default (pve-ssl.pem)
|
|
# by going through the detector. detect_proxmox_certificates()
|
|
# returns the path PVE itself uses, which is what the user sees
|
|
# in the "Available" status — `ssl_configure` was hard-coding
|
|
# the self-signed default and silently downgrading the cert.
|
|
detection = auth_manager.detect_proxmox_certificates()
|
|
if detection.get("proxmox_available"):
|
|
cert_path = detection.get("proxmox_cert") or auth_manager.PROXMOX_CERT_PATH
|
|
key_path = detection.get("proxmox_key") or auth_manager.PROXMOX_KEY_PATH
|
|
else:
|
|
cert_path = auth_manager.PROXMOX_CERT_PATH
|
|
key_path = auth_manager.PROXMOX_KEY_PATH
|
|
elif source == "custom":
|
|
cert_path = data.get("cert_path", "")
|
|
key_path = data.get("key_path", "")
|
|
else:
|
|
return jsonify({"success": False, "message": "Invalid source. Use 'proxmox' or 'custom'."}), 400
|
|
|
|
success, message = auth_manager.configure_ssl(cert_path, key_path, source)
|
|
|
|
if success:
|
|
# Issue #194 cross-detection: if the user already configured
|
|
# the PVE notifications webhook, the registered URL still
|
|
# points at `http://...`. Re-register it now (before the
|
|
# service restart) so PVE picks up the new https:// scheme
|
|
# the moment Flask comes back up. NO-OP when no webhook is
|
|
# registered yet.
|
|
_refresh_pve_webhook_for_ssl_change()
|
|
|
|
if auto_restart:
|
|
_schedule_service_restart()
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "SSL enabled. The service is restarting...",
|
|
"restarting": auto_restart,
|
|
"new_protocol": "https"
|
|
})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/ssl/disable', methods=['POST'])
|
|
@require_auth
|
|
def ssl_disable():
|
|
"""Disable SSL and return to HTTP"""
|
|
try:
|
|
data = request.json or {}
|
|
auto_restart = data.get("auto_restart", True)
|
|
|
|
success, message = auth_manager.disable_ssl()
|
|
|
|
if success:
|
|
# Same cross-detection as `ssl_configure`: rewrite the PVE
|
|
# webhook URL back to http:// so PVE doesn't keep posting
|
|
# to an https:// endpoint that no longer answers.
|
|
_refresh_pve_webhook_for_ssl_change()
|
|
|
|
if auto_restart:
|
|
_schedule_service_restart()
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "SSL disabled. The service is restarting...",
|
|
"restarting": auto_restart,
|
|
"new_protocol": "http"
|
|
})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
def _refresh_pve_webhook_for_ssl_change():
|
|
"""Helper used by both `ssl_configure` and `ssl_disable`.
|
|
|
|
Wraps the deferred import and the try/except so an unrelated
|
|
notifications-stack hiccup never fails the SSL toggle itself.
|
|
Logs but doesn't raise on any error path.
|
|
"""
|
|
try:
|
|
from flask_notification_routes import refresh_pve_webhook_url_if_registered
|
|
result = refresh_pve_webhook_url_if_registered()
|
|
if result.get('skipped'):
|
|
return # Nothing to do — no webhook registered yet.
|
|
if result.get('error'):
|
|
print(f"[ssl] webhook refresh after SSL change had a non-fatal "
|
|
f"error: {result['error']}")
|
|
except Exception as e:
|
|
print(f"[ssl] failed to refresh PVE webhook after SSL change: {e}")
|
|
|
|
|
|
@auth_bp.route('/api/ssl/validate', methods=['POST'])
|
|
@require_auth
|
|
def ssl_validate():
|
|
"""Validate custom certificate and key file paths"""
|
|
try:
|
|
data = request.json or {}
|
|
cert_path = data.get("cert_path", "")
|
|
key_path = data.get("key_path", "")
|
|
|
|
valid, message = auth_manager.validate_certificate_files(cert_path, key_path)
|
|
|
|
return jsonify({"success": valid, "message": message})
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
|
|
@auth_bp.route('/api/auth/decline', methods=['POST'])
|
|
def auth_decline():
|
|
"""Decline authentication setup.
|
|
|
|
Reachable without auth so a fresh install can opt out before any user is
|
|
created — but ONCE auth has been configured, this endpoint must reject:
|
|
otherwise an unauth attacker can `decline` post-setup and turn off the
|
|
requirement to authenticate. See audit Tier 1 #5.
|
|
"""
|
|
try:
|
|
if auth_manager.load_auth_config().get("configured", False):
|
|
return jsonify({
|
|
"success": False,
|
|
"message": "Authentication is already configured; cannot decline."
|
|
}), 403
|
|
success, message = auth_manager.decline_auth()
|
|
|
|
if success:
|
|
return jsonify({"success": True, "message": message})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/login', methods=['POST'])
|
|
def auth_login():
|
|
"""Authenticate user and return JWT token"""
|
|
try:
|
|
# Application-level rate limit (5 tries per IP per 5 min). Hits BEFORE
|
|
# auth so the cost of the attempt — bcrypt-equivalent password check
|
|
# plus DB read — isn't paid by the attacker. Audit Tier 3 #21.
|
|
client_ip = _get_client_ip()
|
|
allowed, retry_after = _login_limiter.check_and_record(client_ip)
|
|
if not allowed:
|
|
auth_logger.warning(
|
|
"login rate limit exceeded; rhost=%s retry_after=%ds",
|
|
client_ip, retry_after,
|
|
)
|
|
return jsonify({
|
|
"success": False,
|
|
"message": "Too many login attempts. Please wait and try again.",
|
|
"retry_after": retry_after,
|
|
}), 429
|
|
|
|
data = request.json
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
totp_token = data.get('totp_token') # Optional 2FA token
|
|
|
|
success, token, requires_totp, message = auth_manager.authenticate(username, password, totp_token)
|
|
|
|
if success:
|
|
return jsonify({"success": True, "token": token, "message": message})
|
|
elif requires_totp:
|
|
# First step: password OK, requesting TOTP code (not a failure)
|
|
return jsonify({"success": False, "requires_totp": True, "message": message}), 200
|
|
else:
|
|
# Authentication failure (wrong password or wrong TOTP code).
|
|
# `client_ip` was already resolved at the top for rate-limiting.
|
|
auth_logger.warning(
|
|
"authentication failure; rhost=%s user=%s",
|
|
client_ip, username or "unknown"
|
|
)
|
|
# If user submitted a TOTP token that was wrong, tell frontend
|
|
# to keep showing the TOTP field (not go back to password step)
|
|
is_totp_failure = totp_token and "2FA" in message
|
|
return jsonify({
|
|
"success": False,
|
|
"message": message,
|
|
"requires_totp": is_totp_failure
|
|
}), 401
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/setup', methods=['POST'])
|
|
def auth_setup():
|
|
"""Set up authentication with username and password (create user + enable auth)"""
|
|
try:
|
|
data = request.json
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
success, message = auth_manager.setup_auth(username, password)
|
|
|
|
if success:
|
|
# Generate a token so the user is logged in immediately
|
|
token = auth_manager.generate_token(username)
|
|
return jsonify({"success": True, "token": token, "message": message})
|
|
else:
|
|
return jsonify({"success": False, "error": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/enable', methods=['POST'])
|
|
def auth_enable():
|
|
"""Enable authentication (must already be configured)"""
|
|
try:
|
|
success, message = auth_manager.enable_auth()
|
|
|
|
if success:
|
|
return jsonify({"success": True, "message": message})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/disable', methods=['POST'])
|
|
def auth_disable():
|
|
"""Disable authentication"""
|
|
try:
|
|
token = request.headers.get('Authorization', '').replace('Bearer ', '')
|
|
if not token or not auth_manager.verify_token(token):
|
|
return jsonify({"success": False, "message": "Unauthorized"}), 401
|
|
|
|
success, message = auth_manager.disable_auth()
|
|
|
|
if success:
|
|
return jsonify({"success": True, "message": message})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/change-password', methods=['POST'])
|
|
@require_auth
|
|
def auth_change_password():
|
|
"""Change authentication password.
|
|
|
|
Accepts an optional `totp_code` in the JSON body. When the account has
|
|
2FA enabled, that code is mandatory — see auth_manager.change_password.
|
|
"""
|
|
try:
|
|
data = request.json or {}
|
|
old_password = data.get('old_password')
|
|
new_password = data.get('new_password')
|
|
totp_code = data.get('totp_code')
|
|
|
|
success, message = auth_manager.change_password(old_password, new_password, totp_code)
|
|
|
|
if success:
|
|
return jsonify({"success": True, "message": message})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/skip', methods=['POST'])
|
|
def auth_skip():
|
|
"""Skip authentication setup (same as decline).
|
|
|
|
Same hardening as /api/auth/decline: once auth is configured, this is
|
|
locked. See audit Tier 1 #5.
|
|
"""
|
|
try:
|
|
if auth_manager.load_auth_config().get("configured", False):
|
|
return jsonify({
|
|
"success": False,
|
|
"message": "Authentication is already configured; cannot skip."
|
|
}), 403
|
|
success, message = auth_manager.decline_auth()
|
|
|
|
if success:
|
|
# Return success with clear indication that APIs should be accessible
|
|
return jsonify({
|
|
"success": True,
|
|
"message": message,
|
|
"auth_declined": True # Add explicit flag for frontend
|
|
})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/totp/setup', methods=['POST'])
|
|
def totp_setup():
|
|
"""Initialize TOTP setup for a user"""
|
|
try:
|
|
token = request.headers.get('Authorization', '').replace('Bearer ', '')
|
|
username = auth_manager.verify_token(token)
|
|
|
|
if not username:
|
|
return jsonify({"success": False, "message": "Unauthorized"}), 401
|
|
|
|
success, secret, qr_code, backup_codes, message = auth_manager.setup_totp(username)
|
|
|
|
if success:
|
|
return jsonify({
|
|
"success": True,
|
|
"secret": secret,
|
|
"qr_code": qr_code,
|
|
"backup_codes": backup_codes,
|
|
"message": message
|
|
})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/totp/enable', methods=['POST'])
|
|
def totp_enable():
|
|
"""Enable TOTP after verification"""
|
|
try:
|
|
token = request.headers.get('Authorization', '').replace('Bearer ', '')
|
|
username = auth_manager.verify_token(token)
|
|
|
|
if not username:
|
|
return jsonify({"success": False, "message": "Unauthorized"}), 401
|
|
|
|
data = request.json
|
|
verification_token = data.get('token')
|
|
|
|
if not verification_token:
|
|
return jsonify({"success": False, "message": "Verification token required"}), 400
|
|
|
|
success, message = auth_manager.enable_totp(username, verification_token)
|
|
|
|
if success:
|
|
return jsonify({"success": True, "message": message})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/totp/disable', methods=['POST'])
|
|
def totp_disable():
|
|
"""Disable TOTP (requires password confirmation)"""
|
|
try:
|
|
token = request.headers.get('Authorization', '').replace('Bearer ', '')
|
|
username = auth_manager.verify_token(token)
|
|
|
|
if not username:
|
|
return jsonify({"success": False, "message": "Unauthorized"}), 401
|
|
|
|
data = request.json or {}
|
|
password = data.get('password')
|
|
totp_code = data.get('totp_code')
|
|
|
|
if not password:
|
|
return jsonify({"success": False, "message": "Password required"}), 400
|
|
|
|
success, message = auth_manager.disable_totp(username, password, totp_code)
|
|
|
|
if success:
|
|
return jsonify({"success": True, "message": message})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/generate-api-token', methods=['POST'])
|
|
def generate_api_token():
|
|
"""Generate a long-lived API token for external integrations (Homepage, Home Assistant, etc.)"""
|
|
try:
|
|
# API tokens are scoped to a real authenticated user. Without
|
|
# auth configured there is no user to attach the token to —
|
|
# surface that as a 400 with a clear message rather than 401,
|
|
# so the UI can show "configure auth first" instead of bouncing
|
|
# the user to a login page that doesn't exist yet.
|
|
config = auth_manager.load_auth_config()
|
|
if not config.get("enabled", False) or config.get("declined", False):
|
|
return jsonify({"success": False, "message": "Authentication must be configured before generating API tokens"}), 400
|
|
|
|
auth_header = request.headers.get('Authorization', '')
|
|
token = auth_header.replace('Bearer ', '')
|
|
|
|
if not token:
|
|
return jsonify({"success": False, "message": "Unauthorized. Please log in first."}), 401
|
|
|
|
username = auth_manager.verify_token(token)
|
|
|
|
if not username:
|
|
return jsonify({"success": False, "message": "Invalid or expired session. Please log in again."}), 401
|
|
|
|
data = request.json
|
|
password = data.get('password')
|
|
totp_token = data.get('totp_token') # Optional 2FA token
|
|
token_name = data.get('token_name', 'API Token') # Optional token description
|
|
# `scope` narrows what the token can do. Defaults to `read_only` —
|
|
# which is the safe choice for the most common integration cases
|
|
# (Homepage / Home Assistant dashboards just read metrics). Caller
|
|
# can opt into `full_admin` explicitly. Audit Tier 6 — Tokens API
|
|
# JWT 365 días sin scope.
|
|
scope = data.get('scope', 'read_only')
|
|
if scope not in ('read_only', 'full_admin'):
|
|
return jsonify({"success": False, "message": "Invalid scope (read_only|full_admin)"}), 400
|
|
|
|
if not password:
|
|
return jsonify({"success": False, "message": "Password is required"}), 400
|
|
|
|
# Authenticate user with password and optional 2FA
|
|
success, _, requires_totp, message = auth_manager.authenticate(username, password, totp_token)
|
|
|
|
if success:
|
|
# Generate a long-lived token (1 year expiration)
|
|
# `auth_manager.JWT_SECRET` (capitalised constant) was removed when
|
|
# the per-install secret moved into `auth.json`; the helper
|
|
# `_get_jwt_secret()` is the public way to read it. Without this
|
|
# call the route AttributeError'd on every API-token generation.
|
|
# iss/aud match the values the verifier expects in Sprint 10E.
|
|
api_token = jwt.encode({
|
|
'username': username,
|
|
'token_name': token_name,
|
|
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=365),
|
|
'iat': datetime.datetime.utcnow(),
|
|
'iss': auth_manager.JWT_ISSUER,
|
|
'aud': auth_manager.JWT_AUDIENCE,
|
|
'scope': scope,
|
|
}, auth_manager._get_jwt_secret(), algorithm='HS256')
|
|
|
|
# Store token metadata for listing and revocation
|
|
auth_manager.store_api_token_metadata(api_token, token_name)
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"token": api_token,
|
|
"token_name": token_name,
|
|
"expires_in": "365 days",
|
|
"message": "API token generated successfully. Store this token securely, it will not be shown again."
|
|
})
|
|
elif requires_totp:
|
|
return jsonify({"success": False, "requires_totp": True, "message": message}), 200
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 401
|
|
except Exception as e:
|
|
print(f"[ERROR] generate_api_token: {str(e)}") # Log error for debugging
|
|
return jsonify({"success": False, "message": f"Internal error: {str(e)}"}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/api-tokens', methods=['GET'])
|
|
def list_api_tokens():
|
|
"""List all generated API tokens (metadata only, no actual token values).
|
|
|
|
When auth is not configured (fresh install) or has been declined, no
|
|
tokens can exist and the endpoint should return an empty list instead
|
|
of 401. Returning 401 here trips the frontend's `fetchApi` redirect
|
|
to `/`, which silently boots the user out of the Security page on
|
|
any host without auth set up — see bug reported 2026-05-07.
|
|
"""
|
|
try:
|
|
config = auth_manager.load_auth_config()
|
|
if not config.get("enabled", False) or config.get("declined", False):
|
|
return jsonify({"success": True, "tokens": []})
|
|
|
|
token = request.headers.get('Authorization', '').replace('Bearer ', '')
|
|
if not token or not auth_manager.verify_token(token):
|
|
return jsonify({"success": False, "message": "Unauthorized"}), 401
|
|
|
|
tokens = auth_manager.list_api_tokens()
|
|
return jsonify({"success": True, "tokens": tokens})
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|
|
|
|
|
|
@auth_bp.route('/api/auth/api-tokens/<token_id>', methods=['DELETE'])
|
|
def revoke_api_token_route(token_id):
|
|
"""Revoke an API token by its ID."""
|
|
try:
|
|
config = auth_manager.load_auth_config()
|
|
# Without configured auth there are no tokens to revoke; surface
|
|
# that as a clean 400 instead of an unhelpful 401.
|
|
if not config.get("enabled", False) or config.get("declined", False):
|
|
return jsonify({"success": False, "message": "Authentication is not configured"}), 400
|
|
|
|
token = request.headers.get('Authorization', '').replace('Bearer ', '')
|
|
if not token or not auth_manager.verify_token(token):
|
|
return jsonify({"success": False, "message": "Unauthorized"}), 401
|
|
|
|
success, message = auth_manager.revoke_api_token(token_id)
|
|
|
|
if success:
|
|
return jsonify({"success": True, "message": message})
|
|
else:
|
|
return jsonify({"success": False, "message": message}), 400
|
|
except Exception as e:
|
|
return jsonify({"success": False, "message": str(e)}), 500
|