update notification service

This commit is contained in:
MacRimi
2026-03-25 22:14:38 +01:00
parent 5aa5942bcd
commit 6da20aab05
7 changed files with 1967 additions and 529 deletions

View File

@@ -36,15 +36,15 @@ class _SharedState:
- Aggregate vm_start/ct_start during startup into single message
Two separate grace periods:
- startup_vm_grace: Time to aggregate VM/CT starts (shorter, 2 min)
- startup_health_grace: Time to suppress transient health errors (longer, 5 min)
- startup_vm_grace: Time to aggregate VM/CT starts (3 min)
- startup_health_grace: Time to suppress transient health errors (5 min)
"""
def __init__(self):
self._lock = threading.Lock()
self._shutdown_time: float = 0 # timestamp when shutdown was detected
self._shutdown_grace = 120 # suppress VM/CT stops for 2 minutes after shutdown detected
self._startup_time: float = time.time() # when module was loaded (service start)
self._startup_vm_grace = 120 # aggregate VM/CT starts for 2 minutes after startup
self._startup_vm_grace = 180 # aggregate VM/CT starts for 3 minutes after startup
self._startup_health_grace = 300 # suppress health warnings for 5 minutes after startup
self._startup_vms: list = [] # [(vmid, vmname, 'vm'|'ct'), ...]
self._startup_aggregated = False # have we already sent the aggregated message?
@@ -62,7 +62,7 @@ class _SharedState:
return (time.time() - self._shutdown_time) < self._shutdown_grace
def is_startup_period(self) -> bool:
"""Check if we're within the startup VM aggregation period (2 min)."""
"""Check if we're within the startup VM aggregation period (3 min)."""
with self._lock:
return (time.time() - self._startup_time) < self._startup_vm_grace
@@ -1640,9 +1640,13 @@ class TaskWatcher:
# let PollingCollector emit one "System startup: X VMs, Y CTs started".
_STARTUP_EVENTS = {'vm_start', 'ct_start'}
if event_type in _STARTUP_EVENTS and not is_error:
if _shared_state.is_startup_period():
is_startup = _shared_state.is_startup_period()
elapsed = time.time() - _shared_state._startup_time
print(f"[TaskWatcher] {event_type} for {vmid}: is_startup_period={is_startup}, elapsed={elapsed:.1f}s")
if is_startup:
vm_type = 'ct' if event_type == 'ct_start' else 'vm'
_shared_state.add_startup_vm(vmid, vmname or f'ID {vmid}', vm_type)
print(f"[TaskWatcher] Aggregated {event_type} for {vmid}, total pending: {len(_shared_state._startup_vms)}")
return
self._queue.put(NotificationEvent(
@@ -2185,11 +2189,16 @@ class PollingCollector:
if _shared_state.was_startup_aggregated():
return
print(f"[PollingCollector] Startup period ended, checking for aggregated VMs...")
# Get all collected startup VMs/CTs
startup_items = _shared_state.get_and_clear_startup_vms()
if not startup_items:
print(f"[PollingCollector] No VMs/CTs collected during startup period")
return
print(f"[PollingCollector] Emitting aggregated startup notification for {len(startup_items)} items")
# Count VMs and CTs
vms = [(vmid, name) for vmid, name, vtype in startup_items if vtype == 'vm']
cts = [(vmid, name) for vmid, name, vtype in startup_items if vtype == 'ct']
@@ -2280,7 +2289,7 @@ class PollingCollector:
if total == 0:
return
# ── Parse every Inst line ─────────────────────────────
# ── Parse every Inst line ────────────────<EFBFBD><EFBFBD>─────────────
all_pkgs: list[dict] = [] # {name, cur, new}
security_pkgs: list[dict] = []
pve_pkgs: list[dict] = []