mirror of
https://github.com/MacRimi/ProxMenux.git
synced 2026-05-31 20:44:42 +00:00
Update AppImage 1.2.1.3
This commit is contained in:
@@ -39,6 +39,20 @@ except ImportError:
|
||||
# Configuration
|
||||
CONFIG_DIR = Path.home() / ".config" / "proxmenux-monitor"
|
||||
AUTH_CONFIG_FILE = CONFIG_DIR / "auth.json"
|
||||
|
||||
# User profile — Fase 2 (v1.2.2). Avatar stored as a binary file next
|
||||
# to auth.json so the JSON stays small and the image can be served
|
||||
# unmodified. Display name is kept inside auth.json as an optional
|
||||
# string; empty/missing falls back to the username at render time.
|
||||
AVATAR_FILE = CONFIG_DIR / "avatar.bin"
|
||||
AVATAR_CONTENT_TYPE_FILE = CONFIG_DIR / "avatar.type"
|
||||
AVATAR_MAX_BYTES = 2 * 1024 * 1024 # 2 MB hard cap on uploads
|
||||
AVATAR_ALLOWED_CONTENT_TYPES = {
|
||||
"image/png",
|
||||
"image/jpeg",
|
||||
"image/webp",
|
||||
"image/gif",
|
||||
}
|
||||
# Sentinel for legacy installs that started under the hardcoded JWT_SECRET.
|
||||
# The audit (Tier 4 #22) flagged that constant — anyone with access to the
|
||||
# public repo could forge JWTs against any deployment. We now generate a
|
||||
@@ -97,7 +111,8 @@ def load_auth_config():
|
||||
"totp_secret": None,
|
||||
"backup_codes": [],
|
||||
"api_tokens": [],
|
||||
"revoked_tokens": []
|
||||
"revoked_tokens": [],
|
||||
"display_name": None,
|
||||
}
|
||||
|
||||
try:
|
||||
@@ -111,6 +126,7 @@ def load_auth_config():
|
||||
config.setdefault("backup_codes", [])
|
||||
config.setdefault("api_tokens", [])
|
||||
config.setdefault("revoked_tokens", [])
|
||||
config.setdefault("display_name", None)
|
||||
return config
|
||||
except Exception as e:
|
||||
print(f"Error loading auth config: {e}")
|
||||
@@ -124,7 +140,8 @@ def load_auth_config():
|
||||
"totp_secret": None,
|
||||
"backup_codes": [],
|
||||
"api_tokens": [],
|
||||
"revoked_tokens": []
|
||||
"revoked_tokens": [],
|
||||
"display_name": None,
|
||||
}
|
||||
|
||||
|
||||
@@ -1280,3 +1297,168 @@ def authenticate(username, password, totp_token=None):
|
||||
return True, token, False, "Authentication successful"
|
||||
else:
|
||||
return False, None, False, "Failed to generate authentication token"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# User profile (Fase 2, v1.2.2)
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# Display name + avatar. Both are optional decorations on top of the
|
||||
# existing username + password. The display name lives inside auth.json
|
||||
# (one extra string field). The avatar is stored as a binary file next
|
||||
# to auth.json so the JSON stays small and the image can be served
|
||||
# without re-encoding.
|
||||
#
|
||||
# No email field — the Monitor doesn't send mail (no password reset, no
|
||||
# confirmation), and the operator-of-PVE-as-root use case never benefits
|
||||
# from one. If OIDC lands in v1.3.0 we'll surface whatever the issuer
|
||||
# claims, but we don't ask the operator for an email manually.
|
||||
|
||||
|
||||
def get_user_profile():
|
||||
"""Return the active user's profile decorations.
|
||||
|
||||
Returns a dict with:
|
||||
{
|
||||
"username": str | None,
|
||||
"display_name": str | None, # may equal username
|
||||
"has_avatar": bool,
|
||||
"avatar_mtime": float | None, # for cache-busting URLs
|
||||
"avatar_content_type": str | None,
|
||||
}
|
||||
Username falls back to None when auth isn't configured/enabled.
|
||||
"""
|
||||
config = load_auth_config()
|
||||
username = config.get("username") if config.get("enabled") else None
|
||||
display_name = config.get("display_name") or None
|
||||
|
||||
has_avatar = AVATAR_FILE.exists() and AVATAR_FILE.stat().st_size > 0
|
||||
avatar_mtime = None
|
||||
avatar_content_type = None
|
||||
if has_avatar:
|
||||
try:
|
||||
avatar_mtime = AVATAR_FILE.stat().st_mtime
|
||||
except OSError:
|
||||
avatar_mtime = None
|
||||
try:
|
||||
if AVATAR_CONTENT_TYPE_FILE.exists():
|
||||
avatar_content_type = AVATAR_CONTENT_TYPE_FILE.read_text().strip() or None
|
||||
except OSError:
|
||||
avatar_content_type = None
|
||||
|
||||
return {
|
||||
"username": username,
|
||||
"display_name": display_name,
|
||||
"has_avatar": has_avatar,
|
||||
"avatar_mtime": avatar_mtime,
|
||||
"avatar_content_type": avatar_content_type,
|
||||
}
|
||||
|
||||
|
||||
def set_display_name(display_name):
|
||||
"""Persist (or clear) the user's display name.
|
||||
|
||||
Accepts any string up to 64 chars. An empty / whitespace-only value
|
||||
clears the field — the dropdown then falls back to the raw username
|
||||
when rendering. Returns (success: bool, message: str).
|
||||
"""
|
||||
cleaned = (display_name or "").strip()
|
||||
if len(cleaned) > 64:
|
||||
return False, "Display name must be 64 characters or less"
|
||||
# Disallow control characters — a display name with embedded \n
|
||||
# would break the avatar dropdown layout.
|
||||
if any(ord(ch) < 0x20 for ch in cleaned):
|
||||
return False, "Display name contains control characters"
|
||||
|
||||
config = load_auth_config()
|
||||
config["display_name"] = cleaned or None
|
||||
if not save_auth_config(config):
|
||||
return False, "Failed to save profile"
|
||||
return True, "Display name updated"
|
||||
|
||||
|
||||
def save_avatar(content_bytes, content_type):
|
||||
"""Persist a new avatar image. Best-effort validation:
|
||||
|
||||
• Content-Type must be one of `AVATAR_ALLOWED_CONTENT_TYPES`.
|
||||
• Size must be <= `AVATAR_MAX_BYTES` (2 MB).
|
||||
• Magic-number check — first few bytes must match a supported image
|
||||
format. This blocks a `.png`-renamed `.exe` from being served as
|
||||
an image to other browsers.
|
||||
|
||||
Returns (success: bool, message: str). Does not resize — the
|
||||
frontend always renders the avatar inside a `rounded-full` with
|
||||
`object-cover`, so any aspect ratio displays correctly. Operators
|
||||
who want a smaller file can compress before upload.
|
||||
"""
|
||||
if not isinstance(content_bytes, (bytes, bytearray)) or not content_bytes:
|
||||
return False, "No image data"
|
||||
if len(content_bytes) > AVATAR_MAX_BYTES:
|
||||
return False, f"Image exceeds {AVATAR_MAX_BYTES // (1024 * 1024)} MB limit"
|
||||
if content_type not in AVATAR_ALLOWED_CONTENT_TYPES:
|
||||
return False, f"Unsupported image type: {content_type}"
|
||||
|
||||
# Magic-number sniffing: trust the Content-Type but verify.
|
||||
head = bytes(content_bytes[:12])
|
||||
looks_valid = (
|
||||
head.startswith(b"\x89PNG\r\n\x1a\n") or # PNG
|
||||
head.startswith(b"\xff\xd8\xff") or # JPEG
|
||||
(head[:4] == b"RIFF" and head[8:12] == b"WEBP") or # WebP
|
||||
head.startswith(b"GIF87a") or head.startswith(b"GIF89a") # GIF
|
||||
)
|
||||
if not looks_valid:
|
||||
return False, "Image bytes don't match a supported format"
|
||||
|
||||
try:
|
||||
ensure_config_dir()
|
||||
# Write atomically — tmp + rename so a crashed write never leaves
|
||||
# a half-written avatar file that the GET endpoint would serve as
|
||||
# corrupt bytes.
|
||||
tmp_avatar = AVATAR_FILE.with_suffix(AVATAR_FILE.suffix + ".tmp")
|
||||
with open(tmp_avatar, "wb") as f:
|
||||
f.write(content_bytes)
|
||||
os.replace(tmp_avatar, AVATAR_FILE)
|
||||
AVATAR_CONTENT_TYPE_FILE.write_text(content_type)
|
||||
try:
|
||||
os.chmod(AVATAR_FILE, 0o600)
|
||||
except OSError:
|
||||
# Best-effort permission tighten; not fatal if the FS doesn't
|
||||
# support it (e.g. some bind-mounted scenarios).
|
||||
pass
|
||||
return True, "Avatar saved"
|
||||
except Exception as e:
|
||||
return False, f"Failed to save avatar: {e}"
|
||||
|
||||
|
||||
def delete_avatar():
|
||||
"""Remove the stored avatar file. Returns (success, message). No-op
|
||||
when there's nothing to delete (still returns success)."""
|
||||
try:
|
||||
if AVATAR_FILE.exists():
|
||||
AVATAR_FILE.unlink()
|
||||
if AVATAR_CONTENT_TYPE_FILE.exists():
|
||||
AVATAR_CONTENT_TYPE_FILE.unlink()
|
||||
return True, "Avatar removed"
|
||||
except Exception as e:
|
||||
return False, f"Failed to remove avatar: {e}"
|
||||
|
||||
|
||||
def get_avatar_bytes():
|
||||
"""Return (bytes, content_type) for the stored avatar, or (None, None)
|
||||
if no avatar is set or the file is unreadable. The caller is
|
||||
responsible for the HTTP response; this only handles the I/O."""
|
||||
if not AVATAR_FILE.exists():
|
||||
return None, None
|
||||
try:
|
||||
data = AVATAR_FILE.read_bytes()
|
||||
except OSError:
|
||||
return None, None
|
||||
content_type = "application/octet-stream"
|
||||
try:
|
||||
if AVATAR_CONTENT_TYPE_FILE.exists():
|
||||
ct = AVATAR_CONTENT_TYPE_FILE.read_text().strip()
|
||||
if ct in AVATAR_ALLOWED_CONTENT_TYPES:
|
||||
content_type = ct
|
||||
except OSError:
|
||||
pass
|
||||
return data, content_type
|
||||
|
||||
@@ -383,6 +383,14 @@ pip3 install --target "$APP_DIR/usr/lib/python3/dist-packages" --upgrade \
|
||||
gevent-websocket>=0.10.1 \
|
||||
greenlet>=3.0.0
|
||||
|
||||
# Phase 3c: Apprise notification hub (issue #207). One library handles
|
||||
# ~80 notification services behind a single URL scheme (`tgram://`,
|
||||
# `discord://`, `ntfy://`, `matrix://`, etc.). Used by the optional
|
||||
# `apprise` channel in notification_channels.py for operators who want
|
||||
# to reach a service we don't support natively.
|
||||
pip3 install --target "$APP_DIR/usr/lib/python3/dist-packages" --upgrade \
|
||||
apprise>=1.7.0
|
||||
|
||||
cat > "$APP_DIR/usr/lib/python3/dist-packages/cgi.py" << 'PYEOF'
|
||||
from typing import Tuple, Dict
|
||||
try:
|
||||
|
||||
@@ -670,3 +670,128 @@ def revoke_api_token_route(token_id):
|
||||
return jsonify({"success": False, "message": message}), 400
|
||||
except Exception as e:
|
||||
return jsonify({"success": False, "message": str(e)}), 500
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# User profile endpoints (Fase 2, v1.2.2)
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# GET /api/auth/profile → username + display_name + has_avatar
|
||||
# PUT /api/auth/profile → update display_name (body: {display_name})
|
||||
# GET /api/auth/profile/avatar → serve the avatar bytes (image/*)
|
||||
# POST /api/auth/profile/avatar → upload new avatar (multipart 'file')
|
||||
# DELETE /api/auth/profile/avatar → remove the stored avatar
|
||||
#
|
||||
# All four require auth via @require_auth. The avatar GET also requires
|
||||
# auth because the file lives next to the auth state on disk and we
|
||||
# don't want it leaked to arbitrary callers — the avatar URL is meant
|
||||
# to be fetched by an already-authenticated session.
|
||||
|
||||
|
||||
@auth_bp.route('/api/auth/profile', methods=['GET'])
|
||||
@require_auth
|
||||
def get_profile():
|
||||
"""Return the active user's profile (username + display name + avatar
|
||||
metadata). Falls back to None values when auth isn't configured."""
|
||||
try:
|
||||
profile = auth_manager.get_user_profile()
|
||||
return jsonify({
|
||||
"success": True,
|
||||
**profile,
|
||||
})
|
||||
except Exception as e:
|
||||
return jsonify({"success": False, "message": str(e)}), 500
|
||||
|
||||
|
||||
@auth_bp.route('/api/auth/profile', methods=['PUT'])
|
||||
@require_auth
|
||||
def update_profile():
|
||||
"""Update display_name. Body: {"display_name": "..."}. Empty string
|
||||
clears it (the dropdown then renders the raw username)."""
|
||||
try:
|
||||
data = request.get_json(silent=True) or {}
|
||||
if "display_name" not in data:
|
||||
return jsonify({
|
||||
"success": False,
|
||||
"message": "Missing 'display_name' field",
|
||||
}), 400
|
||||
ok, message = auth_manager.set_display_name(data.get("display_name") or "")
|
||||
if not ok:
|
||||
return jsonify({"success": False, "message": message}), 400
|
||||
# Return the fresh profile so the frontend can update without a
|
||||
# second roundtrip.
|
||||
return jsonify({"success": True, "message": message, **auth_manager.get_user_profile()})
|
||||
except Exception as e:
|
||||
return jsonify({"success": False, "message": str(e)}), 500
|
||||
|
||||
|
||||
@auth_bp.route('/api/auth/profile/avatar', methods=['GET'])
|
||||
@require_auth
|
||||
def get_avatar():
|
||||
"""Serve the stored avatar bytes. Returns 404 if no avatar set."""
|
||||
try:
|
||||
from flask import Response
|
||||
data, content_type = auth_manager.get_avatar_bytes()
|
||||
if data is None:
|
||||
return jsonify({"success": False, "message": "No avatar set"}), 404
|
||||
return Response(
|
||||
data,
|
||||
mimetype=content_type,
|
||||
headers={
|
||||
# Allow short-window caching keyed by the URL — the
|
||||
# frontend appends `?v=<mtime>` so any update busts the
|
||||
# cache automatically.
|
||||
"Cache-Control": "private, max-age=60",
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
return jsonify({"success": False, "message": str(e)}), 500
|
||||
|
||||
|
||||
@auth_bp.route('/api/auth/profile/avatar', methods=['POST'])
|
||||
@require_auth
|
||||
def upload_avatar():
|
||||
"""Upload a new avatar image. Accepts either:
|
||||
• multipart/form-data with a `file` field (preferred), or
|
||||
• a raw image body with Content-Type set to image/png|jpeg|webp|gif.
|
||||
The size cap (2 MB) and the magic-number sniff happen in
|
||||
auth_manager.save_avatar — failures come back as 400 with a
|
||||
human-readable message."""
|
||||
try:
|
||||
content_bytes = None
|
||||
content_type = None
|
||||
|
||||
# Multipart path
|
||||
if request.files:
|
||||
file_storage = request.files.get("file")
|
||||
if file_storage is not None:
|
||||
content_bytes = file_storage.read()
|
||||
content_type = (file_storage.mimetype or "").lower()
|
||||
|
||||
# Raw body fallback
|
||||
if content_bytes is None:
|
||||
content_bytes = request.get_data(cache=False)
|
||||
content_type = (request.headers.get("Content-Type") or "").split(";", 1)[0].strip().lower()
|
||||
|
||||
if not content_bytes:
|
||||
return jsonify({"success": False, "message": "No image data received"}), 400
|
||||
|
||||
ok, message = auth_manager.save_avatar(content_bytes, content_type)
|
||||
if not ok:
|
||||
return jsonify({"success": False, "message": message}), 400
|
||||
return jsonify({"success": True, "message": message, **auth_manager.get_user_profile()})
|
||||
except Exception as e:
|
||||
return jsonify({"success": False, "message": str(e)}), 500
|
||||
|
||||
|
||||
@auth_bp.route('/api/auth/profile/avatar', methods=['DELETE'])
|
||||
@require_auth
|
||||
def remove_avatar():
|
||||
"""Remove the stored avatar (no-op if none set)."""
|
||||
try:
|
||||
ok, message = auth_manager.delete_avatar()
|
||||
if not ok:
|
||||
return jsonify({"success": False, "message": message}), 400
|
||||
return jsonify({"success": True, "message": message, **auth_manager.get_user_profile()})
|
||||
except Exception as e:
|
||||
return jsonify({"success": False, "message": str(e)}), 500
|
||||
|
||||
@@ -34,6 +34,7 @@ TOOL_METADATA = {
|
||||
'figurine': {'name': 'Figurine', 'function': 'configure_figurine', 'version': '1.0'},
|
||||
'fastfetch': {'name': 'Fastfetch', 'function': 'configure_fastfetch', 'version': '1.0'},
|
||||
'log2ram': {'name': 'Log2ram (SSD Protection)', 'function': 'configure_log2ram', 'version': '1.0'},
|
||||
'zfs_autotrim': {'name': 'ZFS Autotrim', 'function': 'enable_zfs_autotrim', 'version': '1.0'},
|
||||
'amd_fixes': {'name': 'AMD CPU (Ryzen/EPYC) fixes', 'function': 'apply_amd_fixes', 'version': '1.0'},
|
||||
'persistent_network': {'name': 'Setting persistent network interfaces', 'function': 'setup_persistent_network', 'version': '1.0'},
|
||||
'vfio_iommu': {'name': 'VFIO/IOMMU Passthrough', 'function': 'enable_vfio_iommu', 'version': '1.0'},
|
||||
|
||||
@@ -11198,6 +11198,105 @@ def api_vm_logs(vmid):
|
||||
pass
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
@app.route('/api/vms/<int:vmid>/firewall/log', methods=['GET'])
|
||||
@require_auth
|
||||
def api_vm_firewall_log(vmid):
|
||||
"""Per-VM/CT firewall log entries — proxies the official PVE API:
|
||||
`/nodes/<node>/{lxc,qemu}/<vmid>/firewall/log`. Returns the matching
|
||||
lines from `/var/log/pve-firewall.log` already filtered by VMID so
|
||||
the frontend doesn't have to parse the host-wide log itself.
|
||||
|
||||
Implements issue #14554 from the helper-scripts discussions —
|
||||
"view individual VM/CT firewall logs" — without writing any custom
|
||||
log parsing: PVE's API does it natively.
|
||||
|
||||
Query string:
|
||||
* `start` — 0-based offset into the log (default 0).
|
||||
* `limit` — number of lines to return (default 500, cap 5000).
|
||||
|
||||
Response shape mirrors the `/api/vms/<vmid>/logs` endpoint so the
|
||||
frontend log viewer can reuse the same renderer.
|
||||
"""
|
||||
try:
|
||||
start = max(int(request.args.get('start', 0)), 0)
|
||||
limit = min(max(int(request.args.get('limit', 500)), 1), 5000)
|
||||
except (TypeError, ValueError):
|
||||
return jsonify({'error': 'start/limit must be integers'}), 400
|
||||
|
||||
try:
|
||||
resources = get_cached_pvesh_cluster_resources_vm()
|
||||
if not resources:
|
||||
return jsonify({'error': 'Failed to enumerate cluster VMs'}), 500
|
||||
|
||||
vm_info = next((r for r in resources if r.get('vmid') == vmid), None)
|
||||
if not vm_info:
|
||||
return jsonify({'error': f'VM/LXC {vmid} not found'}), 404
|
||||
|
||||
vm_type = 'lxc' if vm_info.get('type') == 'lxc' else 'qemu'
|
||||
node = vm_info.get('node', 'pve')
|
||||
|
||||
log_result = subprocess.run(
|
||||
[
|
||||
'pvesh', 'get',
|
||||
f'/nodes/{node}/{vm_type}/{vmid}/firewall/log',
|
||||
'--start', str(start),
|
||||
'--limit', str(limit),
|
||||
'--output-format', 'json',
|
||||
],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
|
||||
if log_result.returncode != 0:
|
||||
stderr = (log_result.stderr or '').strip()
|
||||
# PVE returns this exact wording when the firewall is OFF
|
||||
# for the guest — surface it as a structured flag so the
|
||||
# frontend can render a "firewall disabled" callout instead
|
||||
# of a generic error toast.
|
||||
firewall_disabled = (
|
||||
'firewall' in stderr.lower() and 'disable' in stderr.lower()
|
||||
) or '404' in stderr
|
||||
return jsonify({
|
||||
'vmid': vmid,
|
||||
'name': vm_info.get('name'),
|
||||
'type': vm_type,
|
||||
'node': node,
|
||||
'firewall_enabled': not firewall_disabled,
|
||||
'logs': [],
|
||||
'error': stderr[:300] if stderr else 'pvesh returned non-zero',
|
||||
}), 200 if firewall_disabled else 500
|
||||
|
||||
entries = []
|
||||
try:
|
||||
data = json.loads(log_result.stdout or '[]')
|
||||
if isinstance(data, list):
|
||||
for row in data:
|
||||
if isinstance(row, dict):
|
||||
entries.append({
|
||||
'n': row.get('n'),
|
||||
't': row.get('t', ''),
|
||||
})
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
# Older PVE versions or oddly-shaped output: fall back to
|
||||
# plain text parsing, one entry per line.
|
||||
for i, line in enumerate(log_result.stdout.split('\n')):
|
||||
if line.strip():
|
||||
entries.append({'n': start + i, 't': line})
|
||||
|
||||
return jsonify({
|
||||
'vmid': vmid,
|
||||
'name': vm_info.get('name'),
|
||||
'type': vm_type,
|
||||
'node': node,
|
||||
'firewall_enabled': True,
|
||||
'log_lines': len(entries),
|
||||
'logs': entries,
|
||||
})
|
||||
except subprocess.TimeoutExpired:
|
||||
return jsonify({'error': 'pvesh timed out reading firewall log'}), 504
|
||||
except Exception as e:
|
||||
return jsonify({'error': str(e)}), 500
|
||||
|
||||
|
||||
@app.route('/api/vms/<int:vmid>/control', methods=['POST'])
|
||||
@require_auth
|
||||
def api_vm_control(vmid):
|
||||
|
||||
@@ -1021,6 +1021,120 @@ class EmailChannel(NotificationChannel):
|
||||
return result.get('success', False), result.get('error', '')
|
||||
|
||||
|
||||
# ─── Apprise ─────────────────────────────────────────────────────
|
||||
|
||||
class AppriseChannel(NotificationChannel):
|
||||
"""Apprise meta-channel — a single URL talks to ~80 services.
|
||||
|
||||
Apprise (https://github.com/caronc/apprise) is a Python library that
|
||||
normalises a wide catalogue of notification destinations behind a
|
||||
single URL scheme: `tgram://`, `discord://`, `slack://`, `gotify://`,
|
||||
`ntfy://`, `matrix://`, `mailto://`, `pushover://`, `signal://`, etc.
|
||||
The operator pastes one URL and ProxMenux delegates the transport.
|
||||
|
||||
Requested in issue #207 by @0berkampf. Implemented as a *separate
|
||||
channel type* (not a replacement for the native Telegram / Gotify /
|
||||
Discord / Email channels), so installs that already have a working
|
||||
native channel don't need to migrate — Apprise is opt-in for users
|
||||
who want to reach a service we don't support natively.
|
||||
|
||||
The library is loaded lazily on first send. Older deployments that
|
||||
haven't installed it yet surface a clean validation error instead
|
||||
of crashing the notification manager at import time.
|
||||
"""
|
||||
|
||||
def __init__(self, url: str):
|
||||
super().__init__()
|
||||
self.url = (url or '').strip()
|
||||
|
||||
# Lazy import so installs that haven't picked up the new dep yet
|
||||
# don't crash on module load. Each call re-imports cheaply — Python
|
||||
# caches the module reference after the first hit.
|
||||
def _load_apprise(self):
|
||||
try:
|
||||
import apprise # type: ignore
|
||||
return apprise
|
||||
except ImportError:
|
||||
return None
|
||||
|
||||
def validate_config(self) -> Tuple[bool, str]:
|
||||
if not self.url:
|
||||
return False, 'Apprise URL is required'
|
||||
apprise = self._load_apprise()
|
||||
if apprise is None:
|
||||
return False, (
|
||||
'apprise library not installed in this deployment. '
|
||||
'Reinstall ProxMenux Monitor or run `pip install apprise` '
|
||||
'inside the AppImage environment.'
|
||||
)
|
||||
# `add(url)` returns True only if Apprise recognised the scheme
|
||||
# — useful as a syntactic validation without sending anything.
|
||||
try:
|
||||
apobj = apprise.Apprise()
|
||||
ok = apobj.add(self.url)
|
||||
if not ok:
|
||||
return False, 'Apprise rejected the URL (unrecognised scheme or bad format)'
|
||||
except Exception as e:
|
||||
return False, f'Apprise rejected the URL: {e}'
|
||||
return True, ''
|
||||
|
||||
def _severity_to_notify_type(self, apprise_mod, severity: str):
|
||||
"""Map ProxMenux severities to Apprise NotifyType constants so
|
||||
services that render severity (e.g. Pushover priority, ntfy
|
||||
priority headers) get the right indicator."""
|
||||
sev = (severity or '').upper()
|
||||
if sev == 'CRITICAL':
|
||||
return apprise_mod.NotifyType.FAILURE
|
||||
if sev == 'WARNING':
|
||||
return apprise_mod.NotifyType.WARNING
|
||||
if sev == 'SUCCESS':
|
||||
return apprise_mod.NotifyType.SUCCESS
|
||||
return apprise_mod.NotifyType.INFO
|
||||
|
||||
def send(self, title: str, message: str, severity: str = 'INFO',
|
||||
data: Optional[Dict] = None) -> Dict[str, Any]:
|
||||
ok, err = self.validate_config()
|
||||
if not ok:
|
||||
return {'success': False, 'error': err, 'channel': 'apprise'}
|
||||
|
||||
# Rate limit (shared with the other channels) before dispatch.
|
||||
def _send_via_apprise() -> Tuple[int, str]:
|
||||
apprise = self._load_apprise()
|
||||
if apprise is None:
|
||||
# Shouldn't happen — validate_config caught it above —
|
||||
# but defend in depth so the retry loop reports cleanly.
|
||||
return 0, 'apprise library not available'
|
||||
try:
|
||||
apobj = apprise.Apprise()
|
||||
apobj.add(self.url)
|
||||
sent = apobj.notify(
|
||||
body=message or '',
|
||||
title=title or '',
|
||||
notify_type=self._severity_to_notify_type(apprise, severity),
|
||||
)
|
||||
# `notify` returns True iff at least one target accepted
|
||||
# the message. False means every URL endpoint rejected
|
||||
# — we don't get a per-URL status code back, hence the
|
||||
# opaque "Apprise rejected the notification".
|
||||
if sent:
|
||||
return 200, ''
|
||||
return 500, 'Apprise rejected the notification (transport failure)'
|
||||
except Exception as e:
|
||||
return 0, str(e)
|
||||
|
||||
result = self._send_with_retry(_send_via_apprise)
|
||||
result['channel'] = 'apprise'
|
||||
return result
|
||||
|
||||
def test(self) -> Tuple[bool, str]:
|
||||
result = self.send(
|
||||
title='ProxMenux Monitor — Test',
|
||||
message='Apprise channel is configured correctly. If you can read this, the URL is valid and the service accepted the notification.',
|
||||
severity='INFO',
|
||||
)
|
||||
return bool(result.get('success')), result.get('error') or ''
|
||||
|
||||
|
||||
# ─── Channel Factory ─────────────────────────────────────────────
|
||||
|
||||
CHANNEL_TYPES = {
|
||||
@@ -1045,16 +1159,21 @@ CHANNEL_TYPES = {
|
||||
'from_address', 'to_addresses', 'subject_prefix'],
|
||||
'class': EmailChannel,
|
||||
},
|
||||
'apprise': {
|
||||
'name': 'Apprise',
|
||||
'config_keys': ['url'],
|
||||
'class': AppriseChannel,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def create_channel(channel_type: str, config: Dict[str, str]) -> Optional[NotificationChannel]:
|
||||
"""Create a channel instance from type name and config dict.
|
||||
|
||||
|
||||
Args:
|
||||
channel_type: 'telegram', 'gotify', or 'discord'
|
||||
channel_type: 'telegram', 'gotify', 'discord', 'email', or 'apprise'
|
||||
config: Dict with channel-specific keys (see CHANNEL_TYPES)
|
||||
|
||||
|
||||
Returns:
|
||||
Channel instance or None if creation fails
|
||||
"""
|
||||
@@ -1076,6 +1195,8 @@ def create_channel(channel_type: str, config: Dict[str, str]) -> Optional[Notifi
|
||||
)
|
||||
elif channel_type == 'email':
|
||||
return EmailChannel(config)
|
||||
elif channel_type == 'apprise':
|
||||
return AppriseChannel(url=config.get('url', ''))
|
||||
except Exception as e:
|
||||
print(f"[NotificationChannels] Failed to create {channel_type}: {e}")
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user