Update jwt

This commit is contained in:
MacRimi
2026-03-15 20:43:05 +01:00
parent b7203b8219
commit 602afc2954
6 changed files with 25 additions and 192 deletions

View File

@@ -15,22 +15,12 @@ import secrets
from datetime import datetime, timedelta
from pathlib import Path
# Try PyJWT first, fall back to our simple implementation
try:
import jwt
JWT_AVAILABLE = True
JWT_BACKEND = "pyjwt"
except ImportError:
try:
# Use our simple JWT implementation (no external dependencies)
import simple_jwt as jwt
JWT_AVAILABLE = True
JWT_BACKEND = "simple_jwt"
print("Using simple_jwt backend (no cryptography dependency)")
except ImportError:
JWT_AVAILABLE = False
JWT_BACKEND = None
print("Warning: No JWT backend available. Authentication features will be limited.")
JWT_AVAILABLE = False
print("Warning: PyJWT not available. Authentication features will be limited.")
try:
import pyotp

View File

@@ -81,7 +81,6 @@ cp "$SCRIPT_DIR/flask_server.py" "$APP_DIR/usr/bin/"
cp "$SCRIPT_DIR/flask_auth_routes.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ flask_auth_routes.py not found"
cp "$SCRIPT_DIR/auth_manager.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ auth_manager.py not found"
cp "$SCRIPT_DIR/jwt_middleware.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ jwt_middleware.py not found"
cp "$SCRIPT_DIR/simple_jwt.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ simple_jwt.py not found"
cp "$SCRIPT_DIR/health_monitor.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ health_monitor.py not found"
cp "$SCRIPT_DIR/health_persistence.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ health_persistence.py not found"
cp "$SCRIPT_DIR/flask_health_routes.py" "$APP_DIR/usr/bin/" 2>/dev/null || echo "⚠️ flask_health_routes.py not found"
@@ -304,6 +303,7 @@ pip3 install --target "$APP_DIR/usr/lib/python3/dist-packages" \
h11==0.9.0 || true
# Phase 2: Install modern Flask/WebSocket dependencies (will upgrade h11 and related packages)
# Note: cryptography removed due to Python version compatibility issues (PyO3 modules)
pip3 install --target "$APP_DIR/usr/lib/python3/dist-packages" --upgrade --no-deps \
flask \
flask-cors \
@@ -312,8 +312,7 @@ pip3 install --target "$APP_DIR/usr/lib/python3/dist-packages" --upgrade --no-de
PyJWT \
pyotp \
segno \
beautifulsoup4 \
cryptography
beautifulsoup4
# Phase 3: Install WebSocket with newer h11
pip3 install --target "$APP_DIR/usr/lib/python3/dist-packages" --upgrade \

View File

@@ -11,13 +11,7 @@ import threading
import time
from flask import Blueprint, jsonify, request
import auth_manager
# Try PyJWT first, fall back to our simple implementation
try:
import jwt
except ImportError:
import simple_jwt as jwt
import jwt
import datetime
# Dedicated logger for auth failures (Fail2Ban reads this file)

View File

@@ -29,12 +29,7 @@ from datetime import datetime, timedelta
from functools import wraps
from pathlib import Path
# Try PyJWT first, fall back to our simple implementation (no cryptography dependency)
try:
import jwt
except ImportError:
import simple_jwt as jwt
import jwt
import psutil
from flask import Flask, jsonify, request, send_file, send_from_directory, Response
from flask_cors import CORS

View File

@@ -24,12 +24,10 @@ from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# Optional: cryptography for encryption
try:
from cryptography.fernet import Fernet
ENCRYPTION_AVAILABLE = True
except ImportError:
ENCRYPTION_AVAILABLE = False
# Note: We use a simple XOR-based encryption for local token storage
# This avoids the cryptography dependency which has Python version compatibility issues
# (PyO3 modules compiled for specific Python versions cause ImportError on different versions)
ENCRYPTION_AVAILABLE = False
# Logging
logger = logging.getLogger("proxmenux.oci")
@@ -66,10 +64,8 @@ def _get_or_create_encryption_key() -> bytes:
with open(ENCRYPTION_KEY_FILE, 'rb') as f:
return f.read()
if ENCRYPTION_AVAILABLE:
key = Fernet.generate_key()
else:
key = secrets.token_bytes(32)
# Generate a 32-byte random key
key = secrets.token_bytes(32)
os.makedirs(os.path.dirname(ENCRYPTION_KEY_FILE), exist_ok=True)
with open(ENCRYPTION_KEY_FILE, 'wb') as f:
@@ -80,20 +76,20 @@ def _get_or_create_encryption_key() -> bytes:
def encrypt_sensitive_value(value: str) -> str:
"""Encrypt a sensitive value. Returns base64-encoded string with 'ENC:' prefix."""
"""
Encrypt a sensitive value using XOR with a random key.
Returns base64-encoded string with 'ENC:' prefix.
Note: Uses XOR encryption which is sufficient for local token storage
and avoids cryptography library compatibility issues across Python versions.
"""
if not value:
return value
key = _get_or_create_encryption_key()
if ENCRYPTION_AVAILABLE:
f = Fernet(key)
encrypted = f.encrypt(value.encode())
return "ENC:" + encrypted.decode()
else:
value_bytes = value.encode()
encrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(value_bytes))
return "ENC:" + base64.b64encode(encrypted).decode()
value_bytes = value.encode()
encrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(value_bytes))
return "ENC:" + base64.b64encode(encrypted).decode()
def decrypt_sensitive_value(encrypted: str) -> str:
@@ -105,14 +101,9 @@ def decrypt_sensitive_value(encrypted: str) -> str:
key = _get_or_create_encryption_key()
try:
if ENCRYPTION_AVAILABLE:
f = Fernet(key)
decrypted = f.decrypt(encrypted_data.encode())
return decrypted.decode()
else:
encrypted_bytes = base64.b64decode(encrypted_data)
decrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(encrypted_bytes))
return decrypted.decode()
encrypted_bytes = base64.b64decode(encrypted_data)
decrypted = bytes(v ^ key[i % len(key)] for i, v in enumerate(encrypted_bytes))
return decrypted.decode()
except Exception as e:
logger.error(f"Failed to decrypt value: {e}")
return encrypted

View File

@@ -1,136 +0,0 @@
"""
Simple JWT Implementation
A minimal JWT implementation using only Python standard library.
Supports HS256 algorithm without requiring cryptography or PyJWT.
This ensures compatibility across all Python versions and systems.
"""
import hmac
import hashlib
import base64
import json
import time
from typing import Optional, Dict, Any
class ExpiredSignatureError(Exception):
"""Token has expired"""
pass
class InvalidTokenError(Exception):
"""Token is invalid"""
pass
def _base64url_encode(data: bytes) -> str:
"""Encode bytes to base64url string (no padding)"""
return base64.urlsafe_b64encode(data).rstrip(b'=').decode('utf-8')
def _base64url_decode(data: str) -> bytes:
"""Decode base64url string to bytes"""
# Add padding if needed
padding = 4 - len(data) % 4
if padding != 4:
data += '=' * padding
return base64.urlsafe_b64decode(data.encode('utf-8'))
def encode(payload: Dict[str, Any], secret: str, algorithm: str = "HS256") -> str:
"""
Encode a payload into a JWT token.
Args:
payload: Dictionary containing the claims
secret: Secret key for signing
algorithm: Algorithm to use (only HS256 supported)
Returns:
JWT token string
"""
if algorithm != "HS256":
raise ValueError(f"Algorithm {algorithm} not supported. Only HS256 is available.")
# Header
header = {"typ": "JWT", "alg": "HS256"}
header_b64 = _base64url_encode(json.dumps(header, separators=(',', ':')).encode('utf-8'))
# Payload
payload_b64 = _base64url_encode(json.dumps(payload, separators=(',', ':')).encode('utf-8'))
# Signature
message = f"{header_b64}.{payload_b64}"
signature = hmac.new(
secret.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).digest()
signature_b64 = _base64url_encode(signature)
return f"{header_b64}.{payload_b64}.{signature_b64}"
def decode(token: str, secret: str, algorithms: list = None) -> Dict[str, Any]:
"""
Decode and verify a JWT token.
Args:
token: JWT token string
secret: Secret key for verification
algorithms: List of allowed algorithms (ignored, only HS256 supported)
Returns:
Decoded payload dictionary
Raises:
InvalidTokenError: If token is malformed or signature is invalid
ExpiredSignatureError: If token has expired
"""
try:
parts = token.split('.')
if len(parts) != 3:
raise InvalidTokenError("Token must have 3 parts")
header_b64, payload_b64, signature_b64 = parts
# Verify signature
message = f"{header_b64}.{payload_b64}"
expected_signature = hmac.new(
secret.encode('utf-8'),
message.encode('utf-8'),
hashlib.sha256
).digest()
actual_signature = _base64url_decode(signature_b64)
if not hmac.compare_digest(expected_signature, actual_signature):
raise InvalidTokenError("Signature verification failed")
# Decode payload
payload = json.loads(_base64url_decode(payload_b64).decode('utf-8'))
# Check expiration
if 'exp' in payload:
if time.time() > payload['exp']:
raise ExpiredSignatureError("Token has expired")
return payload
except (ValueError, KeyError, json.JSONDecodeError) as e:
raise InvalidTokenError(f"Invalid token format: {e}")
# Compatibility aliases for PyJWT interface
class PyJWTCompat:
"""Compatibility class to mimic PyJWT interface"""
ExpiredSignatureError = ExpiredSignatureError
InvalidTokenError = InvalidTokenError
@staticmethod
def encode(payload, secret, algorithm="HS256"):
return encode(payload, secret, algorithm)
@staticmethod
def decode(token, secret, algorithms=None):
return decode(token, secret, algorithms)