mirror of
https://github.com/MacRimi/ProxMenux.git
synced 2026-05-22 00:24:48 +00:00
1157 lines
44 KiB
Python
1157 lines
44 KiB
Python
"""ProxMenux-managed installs registry.
|
|
|
|
Single source of truth for "things ProxMenux installed (or detected as
|
|
already installed) and can check for updates on". Replaces the
|
|
type-specific polling we had before — every check now flows through
|
|
this module, so adding a new tracked install (Coral driver, Frigate,
|
|
etc.) is one entry in DETECTORS + one entry in CHECKERS.
|
|
|
|
Two operation modes:
|
|
|
|
* **Detection** — at AppImage startup and every 24h, every registered
|
|
``DETECTOR`` runs against the host. If the probe finds the thing
|
|
installed and it's not in the registry, we add it (with
|
|
``installed_by="detected"`` so the operator sees we autodiscovered
|
|
it). If it's in the registry but the probe fails, we mark it
|
|
``removed_at`` instead of deleting — keeps history and avoids
|
|
spurious notifications when a probe transiently fails.
|
|
|
|
* **Update check** — for every active entry, the matching ``CHECKER``
|
|
runs and updates ``current_version`` + ``available`` + ``latest``.
|
|
Each checker is responsible for its own per-source cache (the
|
|
Tailscale OCI checker memoises for 24h, NVIDIA for 7 days). The
|
|
notification poll loop reads the registry, emits a notification when
|
|
``available`` flips false→true for a (type, latest) pair it hasn't
|
|
notified yet.
|
|
|
|
Persistence is a single JSON file at
|
|
``/usr/local/share/proxmenux/managed_installs.json``. Atomic writes
|
|
via tmp+rename so a crash mid-write can't leave a half-written file.
|
|
|
|
The module is concurrency-safe: a single ``threading.RLock`` guards
|
|
every read-modify-write so the periodic detector and a request handler
|
|
calling ``get_registry()`` can run in parallel without stepping on
|
|
each other.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import datetime
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
import urllib.request
|
|
from typing import Any, Callable, Optional
|
|
|
|
# ─── Storage ──────────────────────────────────────────────────────────────────
|
|
|
|
_DB_DIR = "/usr/local/share/proxmenux"
|
|
_REGISTRY_PATH = os.path.join(_DB_DIR, "managed_installs.json")
|
|
_SCHEMA_VERSION = 1
|
|
|
|
_lock = threading.RLock()
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.datetime.utcnow().isoformat() + "Z"
|
|
|
|
|
|
def _read_registry() -> dict:
|
|
"""Load the JSON file. Returns the canonical empty shape on first
|
|
run / parse error / permission issue — callers always see a valid
|
|
dict."""
|
|
try:
|
|
with open(_REGISTRY_PATH, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
if isinstance(data, dict) and isinstance(data.get("items"), list):
|
|
return data
|
|
except (FileNotFoundError, IsADirectoryError, PermissionError):
|
|
pass
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
print(f"[ProxMenux] managed_installs read failed ({e}); starting fresh")
|
|
return {"version": _SCHEMA_VERSION, "items": []}
|
|
|
|
|
|
def _write_registry(reg: dict) -> bool:
|
|
"""Atomic write — tmp + rename. Never raises; returns False on any
|
|
OS-level failure so the caller can decide whether to retry."""
|
|
try:
|
|
os.makedirs(_DB_DIR, exist_ok=True)
|
|
tmp = _REGISTRY_PATH + ".tmp"
|
|
with open(tmp, "w", encoding="utf-8") as f:
|
|
json.dump(reg, f, indent=2, ensure_ascii=False)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp, _REGISTRY_PATH)
|
|
return True
|
|
except OSError as e:
|
|
print(f"[ProxMenux] managed_installs write failed: {e}")
|
|
return False
|
|
|
|
|
|
# ─── Public read API ─────────────────────────────────────────────────────────
|
|
|
|
def get_registry() -> dict:
|
|
"""Return the full registry as a dict. Pure read — the caller can
|
|
inspect ``items`` freely. Don't mutate the returned dict."""
|
|
with _lock:
|
|
return _read_registry()
|
|
|
|
|
|
def get_active_items() -> list[dict]:
|
|
"""Items the host actually has installed right now (no
|
|
``removed_at``). Most callers want this, not the full history."""
|
|
with _lock:
|
|
reg = _read_registry()
|
|
return [it for it in reg.get("items", []) if not it.get("removed_at")]
|
|
|
|
|
|
def get_item(item_id: str) -> Optional[dict]:
|
|
with _lock:
|
|
reg = _read_registry()
|
|
for it in reg.get("items", []):
|
|
if it.get("id") == item_id:
|
|
return it
|
|
return None
|
|
|
|
|
|
# ─── DETECTORS — auto-discovery ──────────────────────────────────────────────
|
|
#
|
|
# Each detector is a `() -> Optional[dict]` that returns the *partial*
|
|
# entry shape (id, type, name, current_version, menu_label,
|
|
# menu_script — optional fields too) if the thing is installed on the
|
|
# host, or None if it's not. The framework merges this with the
|
|
# existing registry entry (preserving history) and rewrites if
|
|
# anything changed.
|
|
|
|
|
|
def _detect_nvidia_xfree86() -> Optional[dict]:
|
|
"""Detect a host-side NVIDIA driver via `nvidia-smi`."""
|
|
try:
|
|
proc = subprocess.run(
|
|
[
|
|
"nvidia-smi",
|
|
"--query-gpu=driver_version",
|
|
"--format=csv,noheader",
|
|
],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
except (FileNotFoundError, OSError, subprocess.TimeoutExpired):
|
|
return None
|
|
if proc.returncode != 0:
|
|
return None
|
|
version = (proc.stdout or "").strip().splitlines()[0].strip() if proc.stdout else ""
|
|
if not re.match(r"^\d+\.\d+(\.\d+)?$", version):
|
|
return None
|
|
return {
|
|
"id": "nvidia-host",
|
|
"type": "nvidia_xfree86",
|
|
"name": "NVIDIA Host Driver",
|
|
"current_version": version,
|
|
"menu_label": "GPU & TPU → NVIDIA Driver",
|
|
"menu_script": "scripts/gpu_tpu/nvidia_installer.sh",
|
|
}
|
|
|
|
|
|
# ── Coral TPU host driver (PCIe gasket-dkms + USB libedgetpu1) ──
|
|
#
|
|
# Two install paths share the same registry entry because the user
|
|
# thinks of them as one "Coral driver" install. The detector returns
|
|
# one entry per path that is actually present on the host, so a system
|
|
# with both M.2 and USB Coral devices gets two entries — independent
|
|
# update streams (gasket-dkms from feranick/gasket-driver on GitHub,
|
|
# libedgetpu1-std from Google's apt repo).
|
|
|
|
|
|
def _detect_coral_host() -> list[dict]:
|
|
out: list[dict] = []
|
|
|
|
# PCIe / M.2 — gasket-dkms package version, falling back to the
|
|
# registered DKMS version if the package was force-removed but the
|
|
# built modules still exist.
|
|
pcie_version: Optional[str] = None
|
|
try:
|
|
r = subprocess.run(
|
|
["dpkg-query", "-W", "-f=${Status}|${Version}", "gasket-dkms"],
|
|
capture_output=True, text=True, timeout=3,
|
|
)
|
|
if r.returncode == 0 and "ok installed" in r.stdout:
|
|
pcie_version = r.stdout.split("|", 1)[1].strip()
|
|
except (FileNotFoundError, OSError, subprocess.TimeoutExpired):
|
|
pass
|
|
if not pcie_version:
|
|
try:
|
|
r = subprocess.run(
|
|
["dkms", "status"], capture_output=True, text=True, timeout=3,
|
|
)
|
|
if r.returncode == 0:
|
|
for line in r.stdout.splitlines():
|
|
if line.startswith("gasket"):
|
|
# "gasket, 1.0, ..." or "gasket/1.0, ..."
|
|
m = re.match(r"^gasket[, /]([^,\s]+)", line)
|
|
if m:
|
|
pcie_version = m.group(1)
|
|
break
|
|
except (FileNotFoundError, OSError, subprocess.TimeoutExpired):
|
|
pass
|
|
if pcie_version:
|
|
out.append({
|
|
"id": "coral-host-pcie",
|
|
"type": "coral_host",
|
|
"name": "Coral TPU Driver (gasket-dkms)",
|
|
"current_version": pcie_version,
|
|
"menu_label": "GPU & TPU → Coral TPU",
|
|
"menu_script": "scripts/gpu_tpu/install_coral.sh",
|
|
"_coral_variant": "pcie",
|
|
})
|
|
|
|
# USB — libedgetpu1-std (default) or libedgetpu1-max if the user
|
|
# opted into the overclocked runtime. Either one means the USB
|
|
# path is installed.
|
|
usb_version: Optional[str] = None
|
|
usb_pkg: Optional[str] = None
|
|
for pkg in ("libedgetpu1-std", "libedgetpu1-max"):
|
|
try:
|
|
r = subprocess.run(
|
|
["dpkg-query", "-W", "-f=${Status}|${Version}", pkg],
|
|
capture_output=True, text=True, timeout=3,
|
|
)
|
|
except (FileNotFoundError, OSError, subprocess.TimeoutExpired):
|
|
continue
|
|
if r.returncode == 0 and "ok installed" in r.stdout:
|
|
usb_version = r.stdout.split("|", 1)[1].strip()
|
|
usb_pkg = pkg
|
|
break
|
|
if usb_version and usb_pkg:
|
|
out.append({
|
|
"id": "coral-host-usb",
|
|
"type": "coral_host",
|
|
"name": f"Coral TPU Runtime ({usb_pkg})",
|
|
"current_version": usb_version,
|
|
"menu_label": "GPU & TPU → Coral TPU",
|
|
"menu_script": "scripts/gpu_tpu/install_coral.sh",
|
|
"_coral_variant": "usb",
|
|
"_coral_pkg": usb_pkg,
|
|
})
|
|
|
|
return out
|
|
|
|
|
|
def _detect_oci_apps() -> list[dict]:
|
|
"""Bridge to the OCI manager so every OCI-installed app shows up
|
|
in the registry without a per-app detector here. The OCI manager
|
|
is the source of truth for OCI-specific state — we just project a
|
|
subset into our registry shape."""
|
|
try:
|
|
import oci_manager
|
|
except Exception:
|
|
return []
|
|
try:
|
|
installed = oci_manager.list_installed_apps() or []
|
|
except Exception as e:
|
|
print(f"[ProxMenux] managed_installs OCI bridge failed: {e}")
|
|
return []
|
|
out: list[dict] = []
|
|
for app in installed:
|
|
app_id = app.get("app_id") or app.get("id")
|
|
if not app_id:
|
|
continue
|
|
out.append({
|
|
"id": f"oci:{app_id}",
|
|
"type": "oci_app",
|
|
"name": app.get("name") or app_id,
|
|
"current_version": None, # filled by checker
|
|
"menu_label": "Settings → Secure Gateway",
|
|
"menu_script": None, # OCI apps update via the dashboard, no bash script
|
|
# Stash the raw app_id so the checker can find it without
|
|
# parsing the prefixed registry id.
|
|
"_oci_app_id": app_id,
|
|
})
|
|
return out
|
|
|
|
|
|
# ── LXC containers (Phase 1: apt-based update detection) ────────────
|
|
#
|
|
# Each running Debian/Ubuntu CT becomes a registry entry of type "lxc".
|
|
# Detection is opt-in: gated on the `lxc_updates_available` notification
|
|
# being enabled somewhere, so the heavy `pct exec` work doesn't run on
|
|
# hosts where the user hasn't asked for this.
|
|
#
|
|
# Phase 2 hook: once helper-scripts metadata is integrated, entries can
|
|
# carry `_helper_script_app` so the checker swaps generic apt counting
|
|
# for app-specific upstream-release tracking (Vaultwarden, Jellyfin,
|
|
# etc.). For now every LXC uses the generic apt path.
|
|
|
|
_PCT_BIN = "/usr/sbin/pct"
|
|
_LXC_EXEC_TIMEOUT_SEC = 10
|
|
_LXC_OS_PROBE_TIMEOUT_SEC = 5
|
|
|
|
|
|
def _lxc_updates_notification_enabled() -> bool:
|
|
"""Return True if the user has enabled `lxc_updates_available` on
|
|
at least one configured channel. Used to gate the heavy detection
|
|
+ checker work — when disabled we don't touch any CT at all.
|
|
"""
|
|
try:
|
|
import notification_manager as _nm_mod
|
|
nm = _nm_mod.notification_manager
|
|
return bool(nm.is_event_enabled("lxc_updates_available"))
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _list_pve_lxcs() -> list[dict]:
|
|
"""Return basic info per LXC on this node via ``pct list``. Each
|
|
item is ``{vmid, status, name}``. Empty list on any failure — never
|
|
raises so the detector caller can continue.
|
|
"""
|
|
try:
|
|
r = subprocess.run(
|
|
[_PCT_BIN, "list"],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
|
return []
|
|
if r.returncode != 0:
|
|
return []
|
|
|
|
out: list[dict] = []
|
|
for line in r.stdout.splitlines()[1:]: # skip header row
|
|
# `pct list` columns: VMID Status Lock Name
|
|
# `Lock` is empty most of the time, so split max 4 ways
|
|
parts = line.split(None, 3)
|
|
if len(parts) < 2:
|
|
continue
|
|
vmid = parts[0]
|
|
status = parts[1]
|
|
# Name is the last column; in unlocked rows the 3rd col may
|
|
# be the name itself if Lock was omitted by the formatter.
|
|
name = parts[-1] if len(parts) >= 3 else ""
|
|
if not vmid.isdigit():
|
|
continue
|
|
out.append({"vmid": vmid, "status": status, "name": name})
|
|
return out
|
|
|
|
|
|
_SUPPORTED_OS_FAMILIES = ("debian", "ubuntu", "alpine")
|
|
|
|
|
|
def _probe_lxc_os(vmid: str) -> Optional[str]:
|
|
"""Return a normalized family identifier (``debian`` / ``ubuntu`` /
|
|
``alpine``) by reading ``/etc/os-release`` inside the running CT.
|
|
Returns None for distributions whose package manager we don't yet
|
|
speak — those CTs are skipped in detection so the framework
|
|
doesn't keep retrying a checker we can't run.
|
|
|
|
Cached per CT in the registry — re-probed only when the entry has
|
|
no ``_os_family`` yet, since the OS rarely changes for the life of
|
|
a CT.
|
|
"""
|
|
try:
|
|
r = subprocess.run(
|
|
[_PCT_BIN, "exec", vmid, "--", "cat", "/etc/os-release"],
|
|
capture_output=True, text=True,
|
|
timeout=_LXC_OS_PROBE_TIMEOUT_SEC,
|
|
)
|
|
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
|
return None
|
|
if r.returncode != 0:
|
|
return None
|
|
text = r.stdout.lower()
|
|
if "id=ubuntu" in text:
|
|
return "ubuntu"
|
|
if "id=debian" in text or "id_like=debian" in text:
|
|
return "debian"
|
|
if "id=alpine" in text:
|
|
return "alpine"
|
|
# Future Phase 1.5: CentOS/Rocky/Alma (dnf check-update), Arch
|
|
# (checkupdates), openSUSE (zypper list-updates). Each needs a
|
|
# parser similar to apt/apk — skip silently for now.
|
|
return None
|
|
|
|
|
|
def _detect_lxc_containers() -> list[dict]:
|
|
"""Enumerate running Debian/Ubuntu CTs as registry entries.
|
|
|
|
OS detection is cached in the registry entry (`_os_family`), so the
|
|
expensive ``pct exec cat /etc/os-release`` only runs the first time
|
|
a CT is seen. CT reinstalls with a different OS will keep the old
|
|
family cached until the user resets the registry — acceptable
|
|
trade-off vs paying the probe cost every 24h cycle.
|
|
|
|
Detection runs unconditionally so the dashboard always reflects
|
|
pending updates on running CTs. The `lxc_updates_available`
|
|
notification toggle only gates the *delivery* of the notification
|
|
(see _check_managed_installs_updates in notification_events.py),
|
|
not the detection — that keeps the toggle semantics consistent with
|
|
every other update stream (NVIDIA, Coral, post-install).
|
|
"""
|
|
# Read existing registry so we can preserve cached `_os_family`.
|
|
# No lock needed here — we only inspect; the framework holds the
|
|
# write lock when it merges back our results in detect_and_register.
|
|
try:
|
|
existing = _read_registry().get("items", [])
|
|
except Exception:
|
|
existing = []
|
|
existing_by_id = {
|
|
it.get("id"): it for it in existing
|
|
if isinstance(it, dict) and it.get("type") == "lxc"
|
|
}
|
|
|
|
cts = _list_pve_lxcs()
|
|
out: list[dict] = []
|
|
for ct in cts:
|
|
if ct["status"] != "running":
|
|
continue
|
|
vmid = ct["vmid"]
|
|
cid = f"lxc:{vmid}"
|
|
prior = existing_by_id.get(cid) or {}
|
|
os_family = prior.get("_os_family")
|
|
if not os_family:
|
|
os_family = _probe_lxc_os(vmid)
|
|
if os_family not in _SUPPORTED_OS_FAMILIES:
|
|
# Distribution we don't yet have a package-manager
|
|
# parser for. Skip silently. The framework marks any
|
|
# existing entry as removed_at if it stops appearing
|
|
# in the detector output.
|
|
continue
|
|
out.append({
|
|
"id": cid,
|
|
"type": "lxc",
|
|
"name": ct.get("name") or f"CT-{vmid}",
|
|
"current_version": None, # apt has no single version
|
|
"menu_label": None, # user upgrades inside the CT
|
|
"menu_script": None,
|
|
"_vmid": vmid,
|
|
"_os_family": os_family,
|
|
# Phase 2 hook: populate `_helper_script_app` here once we
|
|
# learn how to read the community-scripts marker.
|
|
})
|
|
return out
|
|
|
|
|
|
# Detectors registered here. Each returns either a single entry dict
|
|
# or a list (for sources that yield multiple items, like OCI). The
|
|
# framework normalises both shapes.
|
|
_DETECTORS: list[Callable[[], Any]] = [
|
|
_detect_nvidia_xfree86,
|
|
_detect_coral_host,
|
|
_detect_oci_apps,
|
|
_detect_lxc_containers,
|
|
]
|
|
|
|
|
|
def _normalise_detector_result(result: Any) -> list[dict]:
|
|
if not result:
|
|
return []
|
|
if isinstance(result, dict):
|
|
return [result]
|
|
if isinstance(result, list):
|
|
return [r for r in result if isinstance(r, dict)]
|
|
return []
|
|
|
|
|
|
def detect_and_register() -> dict:
|
|
"""Run every detector, merge results into the registry, persist.
|
|
|
|
Behaviour per item:
|
|
* detected + not in registry → add, ``installed_by="detected"``
|
|
* detected + in registry as removed → reactivate (clear removed_at)
|
|
* detected + already active → refresh ``current_version`` and any
|
|
metadata that changed (e.g. menu_label evolved)
|
|
* not detected + active in registry → mark ``removed_at``
|
|
|
|
Returns the new registry.
|
|
"""
|
|
discovered: dict[str, dict] = {}
|
|
for detector in _DETECTORS:
|
|
try:
|
|
result = detector()
|
|
except Exception as e:
|
|
print(f"[ProxMenux] managed_installs detector {detector.__name__} failed: {e}")
|
|
continue
|
|
for entry in _normalise_detector_result(result):
|
|
if not entry.get("id"):
|
|
continue
|
|
discovered[entry["id"]] = entry
|
|
|
|
with _lock:
|
|
reg = _read_registry()
|
|
items: list[dict] = list(reg.get("items", []))
|
|
index = {it.get("id"): i for i, it in enumerate(items) if it.get("id")}
|
|
|
|
now = _now_iso()
|
|
|
|
# 1. Add new + reactivate / refresh existing.
|
|
for item_id, entry in discovered.items():
|
|
if item_id in index:
|
|
existing = items[index[item_id]]
|
|
# Reactivate if it was previously removed
|
|
if existing.get("removed_at"):
|
|
existing.pop("removed_at", None)
|
|
existing["reactivated_at"] = now
|
|
# Refresh metadata fields that may have evolved
|
|
for k in ("name", "current_version", "menu_label", "menu_script"):
|
|
if k in entry and entry[k] is not None:
|
|
existing[k] = entry[k]
|
|
# Preserve internal helpers like `_oci_app_id`
|
|
for k, v in entry.items():
|
|
if k.startswith("_"):
|
|
existing[k] = v
|
|
existing["last_seen"] = now
|
|
else:
|
|
# Brand new entry
|
|
new_entry = {
|
|
"id": entry["id"],
|
|
"type": entry.get("type", "unknown"),
|
|
"name": entry.get("name", entry["id"]),
|
|
"current_version": entry.get("current_version"),
|
|
"menu_label": entry.get("menu_label"),
|
|
"menu_script": entry.get("menu_script"),
|
|
"installed_by": "detected",
|
|
"first_seen": now,
|
|
"last_seen": now,
|
|
"update_check": {
|
|
"last_check": None,
|
|
"available": False,
|
|
"latest": None,
|
|
"error": None,
|
|
},
|
|
}
|
|
# Carry over internals (`_oci_app_id` etc.)
|
|
for k, v in entry.items():
|
|
if k.startswith("_"):
|
|
new_entry[k] = v
|
|
items.append(new_entry)
|
|
|
|
# 2. Mark missing items as removed (don't delete — preserve
|
|
# history so a reinstall doesn't lose the audit trail).
|
|
for it in items:
|
|
if not it.get("id") or it.get("removed_at"):
|
|
continue
|
|
if it["id"] not in discovered:
|
|
it["removed_at"] = now
|
|
|
|
reg["items"] = items
|
|
reg["version"] = _SCHEMA_VERSION
|
|
reg["last_detect"] = now
|
|
_write_registry(reg)
|
|
return reg
|
|
|
|
|
|
# ─── CHECKERS — per-type update probes ───────────────────────────────────────
|
|
#
|
|
# A checker takes a registry entry and returns the *update* part of
|
|
# the registry shape:
|
|
# {available, latest, last_check, error?}
|
|
# It must be idempotent and may use its own internal cache so we don't
|
|
# pay the upstream cost on every call.
|
|
|
|
|
|
def _check_oci_app(entry: dict) -> dict:
|
|
"""Delegate to oci_manager — already has its own 24h cache."""
|
|
app_id = entry.get("_oci_app_id") or entry.get("id", "").removeprefix("oci:")
|
|
if not app_id:
|
|
return {"available": False, "latest": None, "last_check": _now_iso(),
|
|
"error": "no app_id in registry entry"}
|
|
try:
|
|
import oci_manager
|
|
state = oci_manager.check_app_update_available(app_id, force=False)
|
|
except Exception as e:
|
|
return {"available": False, "latest": None, "last_check": _now_iso(),
|
|
"error": str(e)}
|
|
if state.get("error"):
|
|
return {"available": False, "latest": None, "last_check": _now_iso(),
|
|
"error": state["error"]}
|
|
return {
|
|
"available": bool(state.get("available")),
|
|
"latest": state.get("latest_version"),
|
|
"current": state.get("current_version"),
|
|
"last_check": state.get("last_checked_iso") or _now_iso(),
|
|
"error": None,
|
|
"_packages": state.get("packages") or [],
|
|
}
|
|
|
|
|
|
# ── NVIDIA driver checker ──
|
|
#
|
|
# Source of truth for what's available upstream:
|
|
# `https://download.nvidia.com/XFree86/Linux-x86_64/latest.txt`
|
|
# returns the single newest version, e.g. "580.105.08"
|
|
# `https://download.nvidia.com/XFree86/Linux-x86_64/`
|
|
# HTML directory listing — we scrape it for per-branch latest
|
|
# (so a user on 570.x gets 570.x's latest, not pushed to 580.x
|
|
# unless their kernel forces a branch upgrade).
|
|
#
|
|
# Cache TTL is 7 days because NVIDIA's release cadence on each branch
|
|
# is roughly monthly. The cache is in-memory only; AppImage restarts
|
|
# refresh it for free.
|
|
|
|
_NVIDIA_BASE = "https://download.nvidia.com/XFree86/Linux-x86_64"
|
|
_NVIDIA_CACHE_TTL = 7 * 86400
|
|
_nvidia_cache: dict[str, Any] = {"versions": [], "fetched_at": 0}
|
|
|
|
|
|
def _nvidia_kernel_compat() -> dict:
|
|
"""Python port of `get_kernel_compatibility_info` in the bash
|
|
installer. Returns ``{kernel, min_version, recommended_branch,
|
|
note}``. Kept identical to the bash matrix so the recommendation
|
|
here matches what the installer would do."""
|
|
try:
|
|
kernel = subprocess.run(
|
|
["uname", "-r"], capture_output=True, text=True, timeout=2,
|
|
).stdout.strip()
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
kernel = ""
|
|
parts = kernel.split(".") if kernel else []
|
|
try:
|
|
major = int(parts[0]) if len(parts) >= 1 else 0
|
|
minor = int(parts[1]) if len(parts) >= 2 else 0
|
|
except (ValueError, TypeError):
|
|
major, minor = 0, 0
|
|
|
|
if major >= 7 or (major == 6 and minor >= 17):
|
|
return {
|
|
"kernel": kernel,
|
|
"min_version": "580.105.08",
|
|
"recommended_branch": "580",
|
|
"note": (f"Kernel {kernel} requires NVIDIA driver 580.105.08 or "
|
|
f"newer (older 580.x builds fail to compile)"),
|
|
}
|
|
if major >= 6 and minor >= 8:
|
|
return {"kernel": kernel, "min_version": "550",
|
|
"recommended_branch": "580",
|
|
"note": f"Kernel {kernel} works with NVIDIA driver 550.x or newer"}
|
|
if major >= 6:
|
|
return {"kernel": kernel, "min_version": "535",
|
|
"recommended_branch": "550",
|
|
"note": f"Kernel {kernel} works with NVIDIA driver 535.x or newer"}
|
|
if major == 5 and minor >= 15:
|
|
return {"kernel": kernel, "min_version": "470",
|
|
"recommended_branch": "535",
|
|
"note": f"Kernel {kernel} works with NVIDIA driver 470.x or newer"}
|
|
return {"kernel": kernel, "min_version": "450",
|
|
"recommended_branch": "470",
|
|
"note": "For older kernels, compatibility may vary"}
|
|
|
|
|
|
def _version_tuple(v: str) -> tuple:
|
|
"""Convert ``580.105.08`` → ``(580, 105, 8)`` for comparison.
|
|
Pads to 3 components so ``580.82`` < ``580.105.08``."""
|
|
out = []
|
|
for chunk in v.split("."):
|
|
try:
|
|
out.append(int(chunk))
|
|
except (ValueError, TypeError):
|
|
out.append(0)
|
|
while len(out) < 3:
|
|
out.append(0)
|
|
return tuple(out[:3])
|
|
|
|
|
|
def _fetch_nvidia_versions(force: bool = False) -> list[str]:
|
|
"""Return the cached list of all upstream versions, or fetch fresh."""
|
|
now = time.time()
|
|
if not force and _nvidia_cache["versions"] and \
|
|
now - _nvidia_cache["fetched_at"] < _NVIDIA_CACHE_TTL:
|
|
return _nvidia_cache["versions"]
|
|
try:
|
|
req = urllib.request.Request(
|
|
_NVIDIA_BASE + "/",
|
|
headers={"User-Agent": "ProxMenux-Monitor/1.0"},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
html = resp.read().decode("utf-8", errors="replace")
|
|
except Exception as e:
|
|
print(f"[ProxMenux] NVIDIA version fetch failed: {e}")
|
|
return _nvidia_cache.get("versions", [])
|
|
versions = sorted(
|
|
{m.group(1) for m in re.finditer(
|
|
r"""href=['"](\d+\.\d+(?:\.\d+)?)/?['"]""", html)},
|
|
key=_version_tuple,
|
|
reverse=True,
|
|
)
|
|
if versions:
|
|
_nvidia_cache["versions"] = versions
|
|
_nvidia_cache["fetched_at"] = now
|
|
return versions
|
|
|
|
|
|
def _is_compat_with_kernel(version: str, kernel_compat: dict) -> bool:
|
|
"""Compare ``version`` (e.g. ``580.105.08``) against the kernel
|
|
compatibility floor. Mirrors the bash ``is_version_compatible``
|
|
helper (full-triple compare when min is dotted, major-only otherwise)."""
|
|
min_str = kernel_compat.get("min_version", "0")
|
|
if "." in min_str and re.match(r"^\d+\.\d+\.\d+$", min_str):
|
|
return _version_tuple(version) >= _version_tuple(min_str)
|
|
# Single-major threshold like "535" or "550"
|
|
try:
|
|
ver_major = int(version.split(".")[0])
|
|
min_major = int(min_str)
|
|
except (ValueError, TypeError):
|
|
return True
|
|
return ver_major >= min_major
|
|
|
|
|
|
def _check_nvidia_xfree86(entry: dict) -> dict:
|
|
"""Compute the update state for a host NVIDIA driver entry.
|
|
|
|
Policy (Option C from the design discussion):
|
|
1. Same-branch newer version available → notify.
|
|
2. Current branch no longer compatible with current kernel →
|
|
notify a branch upgrade with explicit messaging.
|
|
"""
|
|
current = entry.get("current_version")
|
|
if not current or not re.match(r"^\d+\.\d+(\.\d+)?$", current):
|
|
return {"available": False, "latest": None,
|
|
"last_check": _now_iso(), "error": "no installed version"}
|
|
|
|
versions = _fetch_nvidia_versions()
|
|
if not versions:
|
|
return {"available": False, "latest": None,
|
|
"last_check": _now_iso(),
|
|
"error": "could not parse upstream version listing"}
|
|
|
|
kernel_compat = _nvidia_kernel_compat()
|
|
current_branch = current.split(".")[0]
|
|
|
|
same_branch = [v for v in versions if v.split(".")[0] == current_branch
|
|
and _is_compat_with_kernel(v, kernel_compat)]
|
|
same_branch_latest = same_branch[0] if same_branch else None
|
|
|
|
notify_branch_upgrade = False
|
|
branch_upgrade_target: Optional[str] = None
|
|
if not _is_compat_with_kernel(current, kernel_compat):
|
|
# Current branch / version no longer works with current kernel.
|
|
# Recommend the kernel-recommended branch's latest.
|
|
rec_branch = kernel_compat["recommended_branch"]
|
|
rec_branch_versions = [v for v in versions
|
|
if v.split(".")[0] == rec_branch
|
|
and _is_compat_with_kernel(v, kernel_compat)]
|
|
if rec_branch_versions:
|
|
branch_upgrade_target = rec_branch_versions[0]
|
|
notify_branch_upgrade = True
|
|
|
|
available = False
|
|
latest: Optional[str] = None
|
|
upgrade_kind = None # "patch" | "branch_upgrade" | None
|
|
|
|
if notify_branch_upgrade and branch_upgrade_target:
|
|
latest = branch_upgrade_target
|
|
available = True
|
|
upgrade_kind = "branch_upgrade"
|
|
elif same_branch_latest and \
|
|
_version_tuple(same_branch_latest) > _version_tuple(current):
|
|
latest = same_branch_latest
|
|
available = True
|
|
upgrade_kind = "patch"
|
|
|
|
return {
|
|
"available": available,
|
|
"latest": latest,
|
|
"last_check": _now_iso(),
|
|
"error": None,
|
|
"_upgrade_kind": upgrade_kind,
|
|
"_kernel": kernel_compat.get("kernel"),
|
|
"_kernel_note": kernel_compat.get("note"),
|
|
}
|
|
|
|
|
|
def _parse_apt_list_upgradable(text: str) -> list[dict]:
|
|
"""Parse the output of ``apt list --upgradable`` into structured rows.
|
|
|
|
Each upgradable line looks like::
|
|
|
|
package/release version arch [upgradable from: oldversion]
|
|
|
|
Returns a list of ``{name, current, latest, security}``. Lines that
|
|
can't be parsed are skipped; the header ``Listing...`` is ignored
|
|
because it lacks the ``[upgradable`` marker.
|
|
|
|
"security" flag is detected from the release/suite name (e.g.
|
|
``bookworm-security``, ``jammy-security``). Some derivatives don't
|
|
use that naming and will report security=False even when patches
|
|
are present — acceptable for Phase 1, refined later if needed.
|
|
"""
|
|
rows: list[dict] = []
|
|
for line in text.splitlines():
|
|
line = line.strip()
|
|
if not line or "[upgradable" not in line or "/" not in line:
|
|
continue
|
|
try:
|
|
head, _, tail = line.partition(" ")
|
|
name, _, release = head.partition("/")
|
|
tail_parts = tail.split()
|
|
if not tail_parts:
|
|
continue
|
|
new_ver = tail_parts[0]
|
|
old_ver = ""
|
|
if "from:" in line:
|
|
old_ver = line.split("from:", 1)[1].strip().rstrip("]").strip()
|
|
release_lower = release.lower()
|
|
is_security = "-security" in release_lower or "/security" in release_lower
|
|
rows.append({
|
|
"name": name,
|
|
"current": old_ver,
|
|
"latest": new_ver,
|
|
"security": is_security,
|
|
})
|
|
except Exception:
|
|
continue
|
|
return rows
|
|
|
|
|
|
def _parse_apk_list_upgradable(text: str) -> list[dict]:
|
|
"""Parse the output of ``apk list -u`` into structured rows.
|
|
|
|
Lines look like::
|
|
|
|
busybox-1.36.1-r29 x86_64 {busybox} (GPL-2.0-only) [upgradable from: busybox-1.36.1-r28]
|
|
|
|
apk smashes name + version into the leading token, so reliable
|
|
name/version splitting requires walking from the right (versions
|
|
end in ``-r<num>``). For the badge + notification we only need a
|
|
count and a representative sample, so we keep the parser tolerant
|
|
and surface the raw token as the package "name". Alpine's main
|
|
repos don't expose a separate "security" suite via apk metadata,
|
|
so we mark every row as ``security=False`` — security==0 always.
|
|
"""
|
|
rows: list[dict] = []
|
|
for line in text.splitlines():
|
|
line = line.strip()
|
|
if not line or "[upgradable" not in line:
|
|
continue
|
|
try:
|
|
first_tok = line.split(" ", 1)[0]
|
|
old = ""
|
|
if "from:" in line:
|
|
old = line.split("from:", 1)[1].strip().rstrip("]").strip()
|
|
rows.append({
|
|
"name": first_tok,
|
|
"current": old,
|
|
"latest": first_tok,
|
|
"security": False,
|
|
})
|
|
except Exception:
|
|
continue
|
|
return rows
|
|
|
|
|
|
def _run_pct_pkg_listing(vmid: str, cmd: str) -> tuple[bool, str, str]:
|
|
"""Run a package-listing command inside ``vmid`` via ``pct exec``.
|
|
Returns ``(ok, stdout, error_message)``. Centralises the timeout
|
|
and stderr handling so apt/apk callers stay symmetric.
|
|
"""
|
|
try:
|
|
r = subprocess.run(
|
|
[_PCT_BIN, "exec", vmid, "--", "sh", "-c", cmd],
|
|
capture_output=True, text=True,
|
|
timeout=_LXC_EXEC_TIMEOUT_SEC,
|
|
)
|
|
except subprocess.TimeoutExpired:
|
|
return False, "", f"{cmd.split()[0]} listing timed out"
|
|
except (FileNotFoundError, OSError) as e:
|
|
return False, "", str(e)
|
|
if r.returncode != 0:
|
|
return False, "", (r.stderr or "package listing failed").strip()[:200]
|
|
return True, r.stdout, ""
|
|
|
|
|
|
def _check_lxc_updates(entry: dict) -> dict:
|
|
"""Inspect pending package updates inside the LXC and report them.
|
|
|
|
Dispatches to the right package-manager parser based on the cached
|
|
``_os_family``. Uses the CT's existing metadata cache — never runs
|
|
``apt update`` / ``apk update`` from outside, so the user's own
|
|
update cadence (unattended-upgrades, cron) is preserved.
|
|
|
|
The dedup fingerprint (``latest``) combines count, security count
|
|
and the sorted top package names so a stable set of pending
|
|
updates doesn't re-notify daily, while a meaningfully different
|
|
update set does.
|
|
"""
|
|
vmid = entry.get("_vmid")
|
|
family = (entry.get("_os_family") or "").lower()
|
|
if not vmid:
|
|
return {
|
|
"available": False, "latest": None,
|
|
"last_check": _now_iso(), "error": "no vmid in entry",
|
|
}
|
|
|
|
if family in ("debian", "ubuntu"):
|
|
ok, stdout, err = _run_pct_pkg_listing(
|
|
vmid, "apt list --upgradable 2>/dev/null"
|
|
)
|
|
packages = _parse_apt_list_upgradable(stdout) if ok else []
|
|
elif family == "alpine":
|
|
ok, stdout, err = _run_pct_pkg_listing(
|
|
vmid, "apk list -u 2>/dev/null"
|
|
)
|
|
packages = _parse_apk_list_upgradable(stdout) if ok else []
|
|
else:
|
|
return {
|
|
"available": False, "latest": None,
|
|
"last_check": _now_iso(),
|
|
"error": f"unsupported family: {family}",
|
|
}
|
|
|
|
if not ok:
|
|
return {
|
|
"available": False, "latest": None,
|
|
"last_check": _now_iso(), "error": err,
|
|
}
|
|
|
|
count = len(packages)
|
|
sec_count = sum(1 for p in packages if p.get("security"))
|
|
available = count > 0
|
|
latest_fp = None
|
|
if available:
|
|
top_names = ",".join(sorted(p["name"] for p in packages)[:5])
|
|
latest_fp = f"{count}:{sec_count}:{top_names}"
|
|
|
|
return {
|
|
"available": available,
|
|
"latest": latest_fp,
|
|
"last_check": _now_iso(),
|
|
"error": None,
|
|
"_count": count,
|
|
"_security_count": sec_count,
|
|
"_packages": packages[:30], # cap to keep the registry compact
|
|
}
|
|
|
|
|
|
# ── Coral driver checker ──
|
|
#
|
|
# Two upstreams to track:
|
|
#
|
|
# PCIe (gasket-dkms) → feranick/gasket-driver on GitHub. The fork is
|
|
# actively maintained; releases are tagged like "v1.0-22". We pull
|
|
# the latest tag from the GitHub API and compare against the
|
|
# installed gasket-dkms Debian version. Because the Debian version
|
|
# string ("1.0-18") doesn't perfectly match the upstream tag
|
|
# ("v1.0-22"), we normalise both sides to the trailing "-N" build
|
|
# number for the comparison. Strict semver isn't workable here.
|
|
#
|
|
# USB (libedgetpu1-std/-max) → Google's apt repo. `apt-cache policy`
|
|
# reports installed + candidate versions in one shot, no internet
|
|
# round-trip required (apt's own cache is the canonical answer).
|
|
#
|
|
# Cache TTL for the GitHub call is 7 days — feranick's release cadence
|
|
# is roughly monthly, matching NVIDIA's pattern. The cache lives in
|
|
# memory so AppImage restarts refresh it for free.
|
|
|
|
_CORAL_GASKET_REPO = "feranick/gasket-driver"
|
|
_CORAL_CACHE_TTL = 7 * 86400
|
|
_coral_gasket_cache: dict[str, Any] = {"latest_tag": None, "fetched_at": 0}
|
|
|
|
|
|
def _coral_build_number(s: str) -> int:
|
|
"""Extract the trailing build number from a Coral version string.
|
|
|
|
Handles both upstream tag form (``v1.0-22``, ``1.0-22``) and the
|
|
Debian package form (``1.0-22``, ``1.0-18+pmx1``). Returns 0 if no
|
|
trailing ``-N`` segment exists — that pushes "no build number"
|
|
versions to the lowest rank so any tagged release shows as newer.
|
|
"""
|
|
if not s:
|
|
return 0
|
|
m = re.search(r"-(\d+)", s)
|
|
if not m:
|
|
return 0
|
|
try:
|
|
return int(m.group(1))
|
|
except (ValueError, TypeError):
|
|
return 0
|
|
|
|
|
|
def _fetch_gasket_latest_tag(force: bool = False) -> Optional[str]:
|
|
now = time.time()
|
|
if not force and _coral_gasket_cache["latest_tag"] and \
|
|
now - _coral_gasket_cache["fetched_at"] < _CORAL_CACHE_TTL:
|
|
return _coral_gasket_cache["latest_tag"]
|
|
url = f"https://api.github.com/repos/{_CORAL_GASKET_REPO}/tags?per_page=5"
|
|
try:
|
|
req = urllib.request.Request(
|
|
url,
|
|
headers={
|
|
"User-Agent": "ProxMenux-Monitor/1.0",
|
|
"Accept": "application/vnd.github+json",
|
|
},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=15) as resp:
|
|
tags = json.loads(resp.read().decode("utf-8", errors="replace"))
|
|
except Exception as e:
|
|
print(f"[ProxMenux] gasket-driver tag fetch failed: {e}")
|
|
return _coral_gasket_cache.get("latest_tag")
|
|
if not isinstance(tags, list) or not tags:
|
|
return _coral_gasket_cache.get("latest_tag")
|
|
# Pick the tag with the highest trailing build number — feranick's
|
|
# tags are not strictly chronological, occasionally rebuilt.
|
|
best: Optional[str] = None
|
|
best_n = -1
|
|
for t in tags:
|
|
if not isinstance(t, dict):
|
|
continue
|
|
name = t.get("name") or ""
|
|
n = _coral_build_number(name)
|
|
if n > best_n:
|
|
best_n = n
|
|
best = name
|
|
if best:
|
|
_coral_gasket_cache["latest_tag"] = best
|
|
_coral_gasket_cache["fetched_at"] = now
|
|
return best
|
|
|
|
|
|
def _apt_cache_candidate(pkg: str) -> Optional[str]:
|
|
"""Return the candidate (newest available) version for ``pkg`` from
|
|
the local apt cache. Caller is responsible for the package existing —
|
|
a missing package returns None silently.
|
|
"""
|
|
try:
|
|
r = subprocess.run(
|
|
["apt-cache", "policy", pkg],
|
|
capture_output=True, text=True, timeout=5,
|
|
)
|
|
except (FileNotFoundError, OSError, subprocess.TimeoutExpired):
|
|
return None
|
|
if r.returncode != 0:
|
|
return None
|
|
for line in r.stdout.splitlines():
|
|
line = line.strip()
|
|
if line.startswith("Candidate:"):
|
|
cand = line.split(":", 1)[1].strip()
|
|
if cand and cand != "(none)":
|
|
return cand
|
|
return None
|
|
|
|
|
|
def _check_coral_host(entry: dict) -> dict:
|
|
variant = entry.get("_coral_variant") or ""
|
|
current = entry.get("current_version") or ""
|
|
|
|
if variant == "pcie":
|
|
latest_tag = _fetch_gasket_latest_tag()
|
|
if not latest_tag:
|
|
return {"available": False, "latest": None,
|
|
"last_check": _now_iso(),
|
|
"error": "could not fetch gasket-driver tags"}
|
|
cur_n = _coral_build_number(current)
|
|
new_n = _coral_build_number(latest_tag)
|
|
available = new_n > cur_n
|
|
return {
|
|
"available": available,
|
|
"latest": latest_tag if available else None,
|
|
"last_check": _now_iso(),
|
|
"error": None,
|
|
"_coral_variant": "pcie",
|
|
}
|
|
|
|
if variant == "usb":
|
|
pkg = entry.get("_coral_pkg") or "libedgetpu1-std"
|
|
candidate = _apt_cache_candidate(pkg)
|
|
if not candidate:
|
|
return {"available": False, "latest": None,
|
|
"last_check": _now_iso(),
|
|
"error": f"apt-cache policy returned no candidate for {pkg}"}
|
|
# Use plain string compare via the same build-number heuristic
|
|
# apt uses dpkg version compare upstream, but for the libedgetpu
|
|
# packages a trailing "-N" build number is the only thing that
|
|
# ever moves, so the build-number compare is enough here too.
|
|
# If it ever isn't, dpkg --compare-versions is the right call.
|
|
try:
|
|
cmp = subprocess.run(
|
|
["dpkg", "--compare-versions", current, "lt", candidate],
|
|
capture_output=True, timeout=3,
|
|
)
|
|
available = cmp.returncode == 0
|
|
except (FileNotFoundError, OSError, subprocess.TimeoutExpired):
|
|
available = candidate != current
|
|
return {
|
|
"available": available,
|
|
"latest": candidate if available else None,
|
|
"last_check": _now_iso(),
|
|
"error": None,
|
|
"_coral_variant": "usb",
|
|
"_coral_pkg": pkg,
|
|
}
|
|
|
|
return {"available": False, "latest": None,
|
|
"last_check": _now_iso(),
|
|
"error": f"unknown coral variant: {variant}"}
|
|
|
|
|
|
_CHECKERS: dict[str, Callable[[dict], dict]] = {
|
|
"oci_app": _check_oci_app,
|
|
"nvidia_xfree86": _check_nvidia_xfree86,
|
|
"coral_host": _check_coral_host,
|
|
"lxc": _check_lxc_updates,
|
|
}
|
|
|
|
|
|
def check_for_updates(force: bool = False) -> list[dict]:
|
|
"""Run every type-specific checker over active items, persist
|
|
the updated state, return the list of items that have an update
|
|
available right now.
|
|
|
|
The notification poller turns the returned list into events; the
|
|
UI reads ``get_active_items()`` to render the inline "update
|
|
available" line.
|
|
|
|
``force`` invalidates the per-source caches (currently only the
|
|
NVIDIA versions list — OCI keeps its own internal cache).
|
|
"""
|
|
if force:
|
|
_nvidia_cache["versions"] = []
|
|
_nvidia_cache["fetched_at"] = 0
|
|
|
|
updates_available: list[dict] = []
|
|
with _lock:
|
|
reg = _read_registry()
|
|
items = reg.get("items", [])
|
|
for it in items:
|
|
if it.get("removed_at"):
|
|
continue
|
|
checker = _CHECKERS.get(it.get("type"))
|
|
if not checker:
|
|
continue
|
|
try:
|
|
result = checker(it)
|
|
except Exception as e:
|
|
print(f"[ProxMenux] managed_installs checker failed for "
|
|
f"{it.get('id')}: {e}")
|
|
result = {"available": False, "latest": None,
|
|
"last_check": _now_iso(), "error": str(e)}
|
|
|
|
it["update_check"] = {
|
|
"available": bool(result.get("available")),
|
|
"latest": result.get("latest"),
|
|
"last_check": result.get("last_check") or _now_iso(),
|
|
"error": result.get("error"),
|
|
}
|
|
if result.get("current") and not it.get("current_version"):
|
|
it["current_version"] = result["current"]
|
|
# Per-checker extras carried through into the persisted
|
|
# `update_check` blob. Add new keys here when a future
|
|
# checker needs to surface fields beyond available/latest.
|
|
# `_count` + `_security_count` were missing originally, so
|
|
# the LXC checker's counts dropped on the floor and the
|
|
# frontend badge couldn't render.
|
|
for extra_key in ("_packages", "_upgrade_kind", "_kernel",
|
|
"_kernel_note", "_count", "_security_count",
|
|
"_coral_variant", "_coral_pkg"):
|
|
if extra_key in result:
|
|
it["update_check"][extra_key] = result[extra_key]
|
|
|
|
if it["update_check"]["available"]:
|
|
updates_available.append(it)
|
|
|
|
reg["items"] = items
|
|
reg["last_check_run"] = _now_iso()
|
|
_write_registry(reg)
|
|
|
|
return updates_available
|