mirror of
https://github.com/MacRimi/ProxMenux.git
synced 2026-04-18 01:52:20 +00:00
Update notification service
This commit is contained in:
@@ -26,6 +26,69 @@ from typing import Optional, Dict, Any, Tuple
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
# ─── Shared State for Cross-Watcher Coordination ──────────────────
|
||||
|
||||
class _SharedState:
|
||||
"""Module-level state shared between all watchers.
|
||||
|
||||
Used to coordinate behavior when host-level events affect VM/CT events:
|
||||
- Suppress vm_stop/ct_stop during host shutdown (they're expected)
|
||||
- Aggregate vm_start/ct_start during startup into single message
|
||||
"""
|
||||
def __init__(self):
|
||||
self._lock = threading.Lock()
|
||||
self._shutdown_time: float = 0 # timestamp when shutdown was detected
|
||||
self._shutdown_grace = 120 # suppress VM/CT stops for 2 minutes after shutdown detected
|
||||
self._startup_time: float = time.time() # when module was loaded (service start)
|
||||
self._startup_grace = 300 # aggregate VM/CT starts for 5 minutes after startup
|
||||
self._startup_vms: list = [] # [(vmid, vmname, 'vm'|'ct'), ...]
|
||||
self._startup_aggregated = False # have we already sent the aggregated message?
|
||||
|
||||
def mark_shutdown(self):
|
||||
"""Called when system_shutdown or system_reboot is detected."""
|
||||
with self._lock:
|
||||
self._shutdown_time = time.time()
|
||||
|
||||
def is_host_shutting_down(self) -> bool:
|
||||
"""Check if we're within the shutdown grace period."""
|
||||
with self._lock:
|
||||
if self._shutdown_time == 0:
|
||||
return False
|
||||
return (time.time() - self._shutdown_time) < self._shutdown_grace
|
||||
|
||||
def is_startup_period(self) -> bool:
|
||||
"""Check if we're within the startup aggregation period."""
|
||||
with self._lock:
|
||||
return (time.time() - self._startup_time) < self._startup_grace
|
||||
|
||||
def add_startup_vm(self, vmid: str, vmname: str, vm_type: str):
|
||||
"""Record a VM/CT start during startup period for later aggregation."""
|
||||
with self._lock:
|
||||
self._startup_vms.append((vmid, vmname, vm_type))
|
||||
|
||||
def get_and_clear_startup_vms(self) -> list:
|
||||
"""Get all recorded startup VMs and clear the list."""
|
||||
with self._lock:
|
||||
vms = self._startup_vms.copy()
|
||||
self._startup_vms = []
|
||||
self._startup_aggregated = True
|
||||
return vms
|
||||
|
||||
def has_startup_vms(self) -> bool:
|
||||
"""Check if there are any startup VMs recorded."""
|
||||
with self._lock:
|
||||
return len(self._startup_vms) > 0
|
||||
|
||||
def was_startup_aggregated(self) -> bool:
|
||||
"""Check if startup aggregation already happened."""
|
||||
with self._lock:
|
||||
return self._startup_aggregated
|
||||
|
||||
|
||||
# Global shared state instance
|
||||
_shared_state = _SharedState()
|
||||
|
||||
|
||||
# ─── Event Object ─────────────────────────────────────────────────
|
||||
|
||||
class NotificationEvent:
|
||||
@@ -1173,11 +1236,15 @@ class JournalWatcher:
|
||||
break
|
||||
|
||||
if is_reboot:
|
||||
# Mark shutdown state to suppress VM/CT stop events
|
||||
_shared_state.mark_shutdown()
|
||||
self._emit('system_reboot', 'INFO', {
|
||||
'reason': 'The system is rebooting.',
|
||||
'hostname': self._hostname,
|
||||
}, entity='node', entity_id='')
|
||||
elif is_shutdown:
|
||||
# Mark shutdown state to suppress VM/CT stop events
|
||||
_shared_state.mark_shutdown()
|
||||
self._emit('system_shutdown', 'INFO', {
|
||||
'reason': 'The system is shutting down.',
|
||||
'hostname': self._hostname,
|
||||
@@ -1546,6 +1613,24 @@ class TaskWatcher:
|
||||
if self._is_vzdump_active():
|
||||
return
|
||||
|
||||
# Suppress VM/CT stop/shutdown during host shutdown/reboot.
|
||||
# When the host shuts down, all VMs/CTs stop - that's expected behavior,
|
||||
# not something that needs individual notifications.
|
||||
_SHUTDOWN_NOISE = {'vm_stop', 'vm_shutdown', 'ct_stop', 'ct_shutdown'}
|
||||
if event_type in _SHUTDOWN_NOISE and not is_error:
|
||||
if _shared_state.is_host_shutting_down():
|
||||
return
|
||||
|
||||
# During startup period, aggregate VM/CT starts into a single message.
|
||||
# Instead of N individual "VM X started" messages, collect them and
|
||||
# let PollingCollector emit one "System startup: X VMs, Y CTs started".
|
||||
_STARTUP_EVENTS = {'vm_start', 'ct_start'}
|
||||
if event_type in _STARTUP_EVENTS and not is_error:
|
||||
if _shared_state.is_startup_period():
|
||||
vm_type = 'ct' if event_type == 'ct_start' else 'vm'
|
||||
_shared_state.add_startup_vm(vmid, vmname or f'ID {vmid}', vm_type)
|
||||
return
|
||||
|
||||
self._queue.put(NotificationEvent(
|
||||
event_type, severity, data, source='tasks',
|
||||
entity=entity, entity_id=vmid,
|
||||
@@ -1684,6 +1769,7 @@ class PollingCollector:
|
||||
# Dict[error_key, dict(category, severity, reason, first_seen, error_key)]
|
||||
self._known_errors: Dict[str, dict] = {}
|
||||
self._first_poll_done = False
|
||||
self._startup_time = time.time() # Track when service started
|
||||
|
||||
def start(self):
|
||||
if self._running:
|
||||
@@ -1706,10 +1792,17 @@ class PollingCollector:
|
||||
|
||||
# ── Main loop ──────────────────────────────────────────────
|
||||
|
||||
# Startup grace period: ignore transient errors from certain categories
|
||||
# during the first N seconds after service start. Remote services like
|
||||
# PBS storage, VMs with qemu-guest-agent, etc. may take time to connect.
|
||||
STARTUP_GRACE_PERIOD = 180 # 3 minutes
|
||||
STARTUP_GRACE_CATEGORIES = {'storage', 'vms', 'network', 'pve_services'}
|
||||
|
||||
def _poll_loop(self):
|
||||
"""Main polling loop."""
|
||||
# Initial delay to let health monitor warm up
|
||||
for _ in range(15):
|
||||
# Initial delay to let health monitor and external services warm up.
|
||||
# PBS storage, NFS mounts, VMs with guest agent all need time after boot.
|
||||
for _ in range(60):
|
||||
if not self._running:
|
||||
return
|
||||
time.sleep(1)
|
||||
@@ -1750,6 +1843,9 @@ class PollingCollector:
|
||||
return
|
||||
self._check_ai_model_availability()
|
||||
|
||||
# Check if startup period ended and we have aggregated VMs to report
|
||||
self._check_startup_aggregation()
|
||||
|
||||
except Exception as e:
|
||||
print(f"[PollingCollector] Error: {e}")
|
||||
|
||||
@@ -1808,6 +1904,15 @@ class PollingCollector:
|
||||
if error.get('acknowledged') == 1:
|
||||
continue
|
||||
|
||||
# Startup grace period: ignore transient errors from categories that
|
||||
# typically need time to stabilize after boot (storage, VMs, network).
|
||||
# PBS storage, NFS mounts, VMs with qemu-guest-agent need time to connect.
|
||||
time_since_startup = now - self._startup_time
|
||||
if time_since_startup < self.STARTUP_GRACE_PERIOD:
|
||||
if category in self.STARTUP_GRACE_CATEGORIES:
|
||||
# Still within grace period for this category - skip notification
|
||||
continue
|
||||
|
||||
# On first poll, seed _last_notified for all existing errors so we
|
||||
# don't re-notify old persistent errors that were already sent before
|
||||
# a service restart. Only genuinely NEW errors (appearing after the
|
||||
@@ -2054,6 +2159,64 @@ class PollingCollector:
|
||||
self._known_errors = current_keys
|
||||
self._first_poll_done = True
|
||||
|
||||
def _check_startup_aggregation(self):
|
||||
"""Check if startup period ended and emit aggregated VM/CT start message.
|
||||
|
||||
During the startup grace period, TaskWatcher collects VM/CT starts instead
|
||||
of emitting individual notifications. Once the period ends, this method
|
||||
emits a single aggregated "System startup" notification.
|
||||
"""
|
||||
# Only check once startup period is over
|
||||
if _shared_state.is_startup_period():
|
||||
return
|
||||
|
||||
# Only emit once
|
||||
if _shared_state.was_startup_aggregated():
|
||||
return
|
||||
|
||||
# Get all collected startup VMs/CTs
|
||||
startup_items = _shared_state.get_and_clear_startup_vms()
|
||||
if not startup_items:
|
||||
return
|
||||
|
||||
# Count VMs and CTs
|
||||
vms = [(vmid, name) for vmid, name, vtype in startup_items if vtype == 'vm']
|
||||
cts = [(vmid, name) for vmid, name, vtype in startup_items if vtype == 'ct']
|
||||
|
||||
vm_count = len(vms)
|
||||
ct_count = len(cts)
|
||||
total = vm_count + ct_count
|
||||
|
||||
# Build entity list (max 10 items for readability)
|
||||
entity_names = []
|
||||
for vmid, name in (vms + cts)[:10]:
|
||||
entity_names.append(f'{name} ({vmid})')
|
||||
if total > 10:
|
||||
entity_names.append(f'...and {total - 10} more')
|
||||
|
||||
# Build summary text
|
||||
parts = []
|
||||
if vm_count:
|
||||
parts.append(f'{vm_count} VM{"s" if vm_count != 1 else ""}')
|
||||
if ct_count:
|
||||
parts.append(f'{ct_count} CT{"s" if ct_count != 1 else ""}')
|
||||
summary = ' and '.join(parts) + ' started'
|
||||
|
||||
data = {
|
||||
'hostname': self._hostname,
|
||||
'summary': summary,
|
||||
'vm_count': vm_count,
|
||||
'ct_count': ct_count,
|
||||
'total_count': total,
|
||||
'entity_list': ', '.join(entity_names),
|
||||
'reason': f'System startup completed: {summary}',
|
||||
}
|
||||
|
||||
self._queue.put(NotificationEvent(
|
||||
'system_startup', 'INFO', data, source='polling',
|
||||
entity='node', entity_id='',
|
||||
))
|
||||
|
||||
# ── Update check (enriched) ────────────────────────────────
|
||||
|
||||
# Proxmox-related package prefixes used for categorisation
|
||||
|
||||
Reference in New Issue
Block a user