Update AppImage

This commit is contained in:
MacRimi
2026-05-20 18:14:32 +02:00
parent 1087a87ea2
commit 4112323961
20 changed files with 1638 additions and 261 deletions
+328 -1
View File
@@ -189,12 +189,169 @@ def _detect_oci_apps() -> list[dict]:
return out
# ── LXC containers (Phase 1: apt-based update detection) ────────────
#
# Each running Debian/Ubuntu CT becomes a registry entry of type "lxc".
# Detection is opt-in: gated on the `lxc_updates_available` notification
# being enabled somewhere, so the heavy `pct exec` work doesn't run on
# hosts where the user hasn't asked for this.
#
# Phase 2 hook: once helper-scripts metadata is integrated, entries can
# carry `_helper_script_app` so the checker swaps generic apt counting
# for app-specific upstream-release tracking (Vaultwarden, Jellyfin,
# etc.). For now every LXC uses the generic apt path.
_PCT_BIN = "/usr/sbin/pct"
_LXC_EXEC_TIMEOUT_SEC = 10
_LXC_OS_PROBE_TIMEOUT_SEC = 5
def _lxc_updates_notification_enabled() -> bool:
"""Return True if the user has enabled `lxc_updates_available` on
at least one configured channel. Used to gate the heavy detection
+ checker work — when disabled we don't touch any CT at all.
"""
try:
import notification_manager as _nm_mod
nm = _nm_mod.notification_manager
return bool(nm.is_event_enabled("lxc_updates_available"))
except Exception:
return False
def _list_pve_lxcs() -> list[dict]:
"""Return basic info per LXC on this node via ``pct list``. Each
item is ``{vmid, status, name}``. Empty list on any failure — never
raises so the detector caller can continue.
"""
try:
r = subprocess.run(
[_PCT_BIN, "list"],
capture_output=True, text=True, timeout=5,
)
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return []
if r.returncode != 0:
return []
out: list[dict] = []
for line in r.stdout.splitlines()[1:]: # skip header row
# `pct list` columns: VMID Status Lock Name
# `Lock` is empty most of the time, so split max 4 ways
parts = line.split(None, 3)
if len(parts) < 2:
continue
vmid = parts[0]
status = parts[1]
# Name is the last column; in unlocked rows the 3rd col may
# be the name itself if Lock was omitted by the formatter.
name = parts[-1] if len(parts) >= 3 else ""
if not vmid.isdigit():
continue
out.append({"vmid": vmid, "status": status, "name": name})
return out
_SUPPORTED_OS_FAMILIES = ("debian", "ubuntu", "alpine")
def _probe_lxc_os(vmid: str) -> Optional[str]:
"""Return a normalized family identifier (``debian`` / ``ubuntu`` /
``alpine``) by reading ``/etc/os-release`` inside the running CT.
Returns None for distributions whose package manager we don't yet
speak — those CTs are skipped in detection so the framework
doesn't keep retrying a checker we can't run.
Cached per CT in the registry — re-probed only when the entry has
no ``_os_family`` yet, since the OS rarely changes for the life of
a CT.
"""
try:
r = subprocess.run(
[_PCT_BIN, "exec", vmid, "--", "cat", "/etc/os-release"],
capture_output=True, text=True,
timeout=_LXC_OS_PROBE_TIMEOUT_SEC,
)
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return None
if r.returncode != 0:
return None
text = r.stdout.lower()
if "id=ubuntu" in text:
return "ubuntu"
if "id=debian" in text or "id_like=debian" in text:
return "debian"
if "id=alpine" in text:
return "alpine"
# Future Phase 1.5: CentOS/Rocky/Alma (dnf check-update), Arch
# (checkupdates), openSUSE (zypper list-updates). Each needs a
# parser similar to apt/apk — skip silently for now.
return None
def _detect_lxc_containers() -> list[dict]:
"""Enumerate running Debian/Ubuntu CTs as registry entries.
OS detection is cached in the registry entry (`_os_family`), so the
expensive ``pct exec cat /etc/os-release`` only runs the first time
a CT is seen. CT reinstalls with a different OS will keep the old
family cached until the user resets the registry — acceptable
trade-off vs paying the probe cost every 24h cycle.
"""
if not _lxc_updates_notification_enabled():
return []
# Read existing registry so we can preserve cached `_os_family`.
# No lock needed here — we only inspect; the framework holds the
# write lock when it merges back our results in detect_and_register.
try:
existing = _read_registry().get("items", [])
except Exception:
existing = []
existing_by_id = {
it.get("id"): it for it in existing
if isinstance(it, dict) and it.get("type") == "lxc"
}
cts = _list_pve_lxcs()
out: list[dict] = []
for ct in cts:
if ct["status"] != "running":
continue
vmid = ct["vmid"]
cid = f"lxc:{vmid}"
prior = existing_by_id.get(cid) or {}
os_family = prior.get("_os_family")
if not os_family:
os_family = _probe_lxc_os(vmid)
if os_family not in _SUPPORTED_OS_FAMILIES:
# Distribution we don't yet have a package-manager
# parser for. Skip silently. The framework marks any
# existing entry as removed_at if it stops appearing
# in the detector output.
continue
out.append({
"id": cid,
"type": "lxc",
"name": ct.get("name") or f"CT-{vmid}",
"current_version": None, # apt has no single version
"menu_label": None, # user upgrades inside the CT
"menu_script": None,
"_vmid": vmid,
"_os_family": os_family,
# Phase 2 hook: populate `_helper_script_app` here once we
# learn how to read the community-scripts marker.
})
return out
# Detectors registered here. Each returns either a single entry dict
# or a list (for sources that yield multiple items, like OCI). The
# framework normalises both shapes.
_DETECTORS: list[Callable[[], Any]] = [
_detect_nvidia_xfree86,
_detect_oci_apps,
_detect_lxc_containers,
]
@@ -514,9 +671,173 @@ def _check_nvidia_xfree86(entry: dict) -> dict:
}
def _parse_apt_list_upgradable(text: str) -> list[dict]:
"""Parse the output of ``apt list --upgradable`` into structured rows.
Each upgradable line looks like::
package/release version arch [upgradable from: oldversion]
Returns a list of ``{name, current, latest, security}``. Lines that
can't be parsed are skipped; the header ``Listing...`` is ignored
because it lacks the ``[upgradable`` marker.
"security" flag is detected from the release/suite name (e.g.
``bookworm-security``, ``jammy-security``). Some derivatives don't
use that naming and will report security=False even when patches
are present — acceptable for Phase 1, refined later if needed.
"""
rows: list[dict] = []
for line in text.splitlines():
line = line.strip()
if not line or "[upgradable" not in line or "/" not in line:
continue
try:
head, _, tail = line.partition(" ")
name, _, release = head.partition("/")
tail_parts = tail.split()
if not tail_parts:
continue
new_ver = tail_parts[0]
old_ver = ""
if "from:" in line:
old_ver = line.split("from:", 1)[1].strip().rstrip("]").strip()
release_lower = release.lower()
is_security = "-security" in release_lower or "/security" in release_lower
rows.append({
"name": name,
"current": old_ver,
"latest": new_ver,
"security": is_security,
})
except Exception:
continue
return rows
def _parse_apk_list_upgradable(text: str) -> list[dict]:
"""Parse the output of ``apk list -u`` into structured rows.
Lines look like::
busybox-1.36.1-r29 x86_64 {busybox} (GPL-2.0-only) [upgradable from: busybox-1.36.1-r28]
apk smashes name + version into the leading token, so reliable
name/version splitting requires walking from the right (versions
end in ``-r<num>``). For the badge + notification we only need a
count and a representative sample, so we keep the parser tolerant
and surface the raw token as the package "name". Alpine's main
repos don't expose a separate "security" suite via apk metadata,
so we mark every row as ``security=False`` — security==0 always.
"""
rows: list[dict] = []
for line in text.splitlines():
line = line.strip()
if not line or "[upgradable" not in line:
continue
try:
first_tok = line.split(" ", 1)[0]
old = ""
if "from:" in line:
old = line.split("from:", 1)[1].strip().rstrip("]").strip()
rows.append({
"name": first_tok,
"current": old,
"latest": first_tok,
"security": False,
})
except Exception:
continue
return rows
def _run_pct_pkg_listing(vmid: str, cmd: str) -> tuple[bool, str, str]:
"""Run a package-listing command inside ``vmid`` via ``pct exec``.
Returns ``(ok, stdout, error_message)``. Centralises the timeout
and stderr handling so apt/apk callers stay symmetric.
"""
try:
r = subprocess.run(
[_PCT_BIN, "exec", vmid, "--", "sh", "-c", cmd],
capture_output=True, text=True,
timeout=_LXC_EXEC_TIMEOUT_SEC,
)
except subprocess.TimeoutExpired:
return False, "", f"{cmd.split()[0]} listing timed out"
except (FileNotFoundError, OSError) as e:
return False, "", str(e)
if r.returncode != 0:
return False, "", (r.stderr or "package listing failed").strip()[:200]
return True, r.stdout, ""
def _check_lxc_updates(entry: dict) -> dict:
"""Inspect pending package updates inside the LXC and report them.
Dispatches to the right package-manager parser based on the cached
``_os_family``. Uses the CT's existing metadata cache — never runs
``apt update`` / ``apk update`` from outside, so the user's own
update cadence (unattended-upgrades, cron) is preserved.
The dedup fingerprint (``latest``) combines count, security count
and the sorted top package names so a stable set of pending
updates doesn't re-notify daily, while a meaningfully different
update set does.
"""
vmid = entry.get("_vmid")
family = (entry.get("_os_family") or "").lower()
if not vmid:
return {
"available": False, "latest": None,
"last_check": _now_iso(), "error": "no vmid in entry",
}
if family in ("debian", "ubuntu"):
ok, stdout, err = _run_pct_pkg_listing(
vmid, "apt list --upgradable 2>/dev/null"
)
packages = _parse_apt_list_upgradable(stdout) if ok else []
elif family == "alpine":
ok, stdout, err = _run_pct_pkg_listing(
vmid, "apk list -u 2>/dev/null"
)
packages = _parse_apk_list_upgradable(stdout) if ok else []
else:
return {
"available": False, "latest": None,
"last_check": _now_iso(),
"error": f"unsupported family: {family}",
}
if not ok:
return {
"available": False, "latest": None,
"last_check": _now_iso(), "error": err,
}
count = len(packages)
sec_count = sum(1 for p in packages if p.get("security"))
available = count > 0
latest_fp = None
if available:
top_names = ",".join(sorted(p["name"] for p in packages)[:5])
latest_fp = f"{count}:{sec_count}:{top_names}"
return {
"available": available,
"latest": latest_fp,
"last_check": _now_iso(),
"error": None,
"_count": count,
"_security_count": sec_count,
"_packages": packages[:30], # cap to keep the registry compact
}
_CHECKERS: dict[str, Callable[[dict], dict]] = {
"oci_app": _check_oci_app,
"nvidia_xfree86": _check_nvidia_xfree86,
"lxc": _check_lxc_updates,
}
@@ -562,8 +883,14 @@ def check_for_updates(force: bool = False) -> list[dict]:
}
if result.get("current") and not it.get("current_version"):
it["current_version"] = result["current"]
# Per-checker extras carried through into the persisted
# `update_check` blob. Add new keys here when a future
# checker needs to surface fields beyond available/latest.
# `_count` + `_security_count` were missing originally, so
# the LXC checker's counts dropped on the floor and the
# frontend badge couldn't render.
for extra_key in ("_packages", "_upgrade_kind", "_kernel",
"_kernel_note"):
"_kernel_note", "_count", "_security_count"):
if extra_key in result:
it["update_check"][extra_key] = result[extra_key]